Selecione suas preferências de cookies

Usamos cookies essenciais e ferramentas semelhantes que são necessárias para fornecer nosso site e serviços. Usamos cookies de desempenho para coletar estatísticas anônimas, para que possamos entender como os clientes usam nosso site e fazer as devidas melhorias. Cookies essenciais não podem ser desativados, mas você pode clicar em “Personalizar” ou “Recusar” para recusar cookies de desempenho.

Se você concordar, a AWS e terceiros aprovados também usarão cookies para fornecer recursos úteis do site, lembrar suas preferências e exibir conteúdo relevante, incluindo publicidade relevante. Para aceitar ou recusar todos os cookies não essenciais, clique em “Aceitar” ou “Recusar”. Para fazer escolhas mais detalhadas, clique em “Personalizar”.

Ingestão de dados de streaming usando o Kinesis - Amazon Redshift

Ingestão de dados de streaming usando o Kinesis

Esse procedimento demonstra como ingerir dados de uma transmissão do Kinesis chamada ev_station_data, que contém dados de consumo de diferentes estações de carregamento EV, no formato JSON. O esquema está bem definido. O exemplo mostra como armazenar os dados como JSON bruto, e também como converter os dados JSON em tipos de dados do Amazon Redshift à medida que são ingeridos.

Configuração do produtor

  1. Usando o Amazon Kinesis Data Streams, siga as etapas para criar uma transmissão chamada ev_station_data. Escolha On-demand (Sob demanda) para o Capacity mode (Modo de capacidade). Para obter mais informações, consulte Gerenciamento de transmissões via Console de Gerenciamento da AWS.

  2. O Gerador de dados do Amazon Kinesis pode ajudá-lo a gerar dados de teste para uso com sua transmissão. Siga as etapas detalhadas na ferramenta para começar, e use o seguinte modelo de dados para gerar seus dados:

    { "_id" : "{{random.uuid}}", "clusterID": "{{random.number( { "min":1, "max":50 } )}}", "connectionTime": "{{date.now("YYYY-MM-DD HH:mm:ss")}}", "kWhDelivered": "{{commerce.price}}", "stationID": "{{random.number( { "min":1, "max":467 } )}}", "spaceID": "{{random.word}}-{{random.number( { "min":1, "max":20 } )}}", "timezone": "America/Los_Angeles", "userID": "{{random.number( { "min":1000, "max":500000 } )}}" }

    Cada objeto JSON nos dados da transmissão tem as seguintes propriedades:

    { "_id": "12084f2f-fc41-41fb-a218-8cc1ac6146eb", "clusterID": "49", "connectionTime": "2022-01-31 13:17:15", "kWhDelivered": "74.00", "stationID": "421", "spaceID": "technologies-2", "timezone": "America/Los_Angeles", "userID": "482329" }

Configuração do Amazon Redshift

Essas etapas mostram como configurar a exibição materializada para ingerir dados.

  1. Crie um esquema externo para mapear os dados do Kinesis para um objeto do Redshift.

    CREATE EXTERNAL SCHEMA evdata FROM KINESIS IAM_ROLE 'arn:aws:iam::0123456789:role/redshift-streaming-role';

    Para obter mais informações sobre como configurar a função do IAM, consulte Conceitos básicos da ingestão de streaming do Amazon Kinesis Data Streams.

  2. Crie uma exibição materializada para consumir os dados da transmissão. Os exemplos a seguir mostram os dois métodos de definição de exibições materializadas para ingerir os dados de origem JSON.

    Primeiro, armazene registros da transmissão em formato SUPER semiestruturado. Neste exemplo, a origem JSON é armazenada no Redshift sem conversão para tipos do Redshift.

    CREATE MATERIALIZED VIEW ev_station_data AS SELECT approximate_arrival_timestamp, partition_key, shard_id, sequence_number, case when can_json_parse(kinesis_data) then json_parse(kinesis_data) else null end as payload, case when not can_json_parse(kinesis_data) then kinesis_data else null end as failed_payload FROM evdata."ev_station_data" ;

    Em contraste, na definição seguinte de exibição materializada, a exibição materializada tem um esquema definido no Redshift. A exibição materializada é distribuída no valor de UUID da transmissão e é classificada pelo valor approximatearrivaltimestamp.

    CREATE MATERIALIZED VIEW ev_station_data_extract DISTKEY(6) sortkey(1) AUTO REFRESH YES AS SELECT refresh_time, approximate_arrival_timestamp, partition_key, shard_id, sequence_number, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'_id',true)::character(36) as ID, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'clusterID',true)::varchar(30) as clusterID, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'connectionTime',true)::varchar(20) as connectionTime, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'kWhDelivered',true)::DECIMAL(10,2) as kWhDelivered, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'stationID',true)::DECIMAL(10,2) as stationID, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'spaceID',true)::varchar(100) as spaceID, json_extract_path_text(from_varbyte(kinesis_data, 'utf-8'),'timezone',true)::varchar(30)as timezone, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'userID',true)::varchar(30) as userID FROM evdata."ev_station_data" WHERE LENGTH(kinesis_data) < 65355;

Consulte a transmissão

  1. Consulte a exibição materializada atualizada para obter estatísticas de uso.

    SELECT to_timestamp(connectionTime, 'YYYY-MM-DD HH24:MI:SS') as connectiontime ,SUM(kWhDelivered) AS Energy_Consumed ,count(distinct userID) AS #Users from ev_station_data_extract group by to_timestamp(connectionTime, 'YYYY-MM-DD HH24:MI:SS') order by 1 desc;
  2. Exibir resultados.

    connectiontime energy_consumed #users 2022-02-08 16:07:21+00 4139 10 2022-02-08 16:07:20+00 5571 10 2022-02-08 16:07:19+00 8697 20 2022-02-08 16:07:18+00 4408 10 2022-02-08 16:07:17+00 4257 10 2022-02-08 16:07:16+00 6861 10 2022-02-08 16:07:15+00 5643 10 2022-02-08 16:07:14+00 3677 10 2022-02-08 16:07:13+00 4673 10 2022-02-08 16:07:11+00 9689 20
PrivacidadeTermos do sitePreferências de cookies
© 2025, Amazon Web Services, Inc. ou suas afiliadas. Todos os direitos reservados.