Ingira dados de IoT de forma econômica diretamente no Amazon S3 usando o IoT Greengrass AWS - Recomendações da AWS

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Ingira dados de IoT de forma econômica diretamente no Amazon S3 usando o IoT Greengrass AWS

Criado por Sebastian Viviani (AWS) e Rizwan Syed () AWS

Ambiente: PoC ou piloto

Tecnologias: Análise; IoT

Workload: código aberto

AWSserviços: AWS IoT Greengrass; Amazon S3; Amazon Athena

Resumo

Esse padrão mostra como ingerir dados da Internet das Coisas (IoT) de forma econômica diretamente em um bucket do Amazon Simple Storage Service (Amazon S3) usando um AWS dispositivo IoT Greengrass versão 2. O dispositivo executa um componente personalizado que lê os dados da IoT e os salva em armazenamento persistente (ou seja, um disco ou volume local). Em seguida, o dispositivo compacta os dados de IoT em um arquivo Apache Parquet e carrega os dados periodicamente em um bucket do S3.

A quantidade e a velocidade dos dados de IoT que você ingere são limitadas apenas pelos recursos de hardware de borda e pela largura de banda da rede. É possível usar o Amazon Athena para analisar de forma econômica os dados ingeridos. O Athena suporta arquivos compactados do Apache Parquet e visualização de dados usando o Amazon Managed Grafana.

Pré-requisitos e limitações

Pré-requisitos

Limitações

  • Os dados nesse padrão não são enviados em tempo real para o bucket do S3. Há um período de atraso e você pode configurar esse período. Os dados são armazenados temporariamente no dispositivo de borda e, em seguida, carregados quando o período expira.

  • O SDK está disponível somente em Java, Node.js e Python.

Arquitetura

Pilha de tecnologias de destino

  • Amazon S3

  • AWS IoT Greengrass

  • MQTTcorretor

  • Componente gerenciador de fluxo

Arquitetura de destino

O diagrama a seguir mostra uma arquitetura projetada para ingerir dados de sensores de IoT e armazená-los em um bucket do S3.

Diagrama de arquitetura

O diagrama mostra o seguinte fluxo de trabalho:

  1. Várias atualizações de sensores (por exemplo, temperatura e válvula) são publicadas em um MQTT corretor local.

  2. O compressor de arquivos Parquet que está inscrito nesses sensores atualiza os tópicos e recebe essas atualizações.

  3. O compressor de arquivos Parquet armazena as atualizações localmente.

  4. Após o término do período, os arquivos armazenados são compactados em arquivos Parquet e transmitidos ao gerenciador de fluxo para serem carregados no bucket do S3 especificado.

  5. O gerenciador de fluxo carrega os arquivos Parquet para o bucket do S3.

Nota: o gerenciador de fluxo (StreamManager) é um componente gerenciado. Para exemplos de como exportar dados para o Amazon S3, consulte Stream manager na documentação do IoT AWS Greengrass. Você pode usar um MQTT corretor local como componente ou outro corretor como o Eclipse Mosquitto.

Ferramentas

AWSferramentas

  • O Amazon Athena é um serviço de consulta interativo que ajuda você a analisar dados diretamente no Amazon S3 usando o padrão. SQL

  • O Amazon Simple Storage Service (Amazon S3) é um serviço de armazenamento de objetos baseado na nuvem que ajuda você a armazenar, proteger e recuperar qualquer quantidade de dados.

  • AWSO IoT Greengrass é um serviço de nuvem e tempo de execução de ponta de IoT de código aberto que ajuda você a criar, implantar e gerenciar aplicativos de IoT em seus dispositivos.

Outras ferramentas

  • O Apache Parquet é um formato de arquivos de dados orientados por colunas de código aberto projetado para armazenamento e recuperação.

  • MQTT(Message Queuing Telemetry Transport) é um protocolo de mensagens leve projetado para dispositivos restritos.

Práticas recomendadas

Use o formato de partição correto para dados carregados

Não há requisitos específicos para os nomes do prefixo raiz no bucket do S3 (por exemplo, "myAwesomeDataSet/" ou"dataFromSource"), mas recomendamos que você use uma partição e um prefixo significativos para facilitar a compreensão da finalidade do conjunto de dados.

Também recomendamos que você use o particionamento correto no Amazon S3 para que as consultas sejam executadas de maneira ideal no conjunto de dados. No exemplo a seguir, os dados são particionados em HIVE formato para que a quantidade de dados digitalizados por cada consulta do Athena seja otimizada. Isso melhora o desempenho e reduz os custos.

s3://<ingestionBucket>/<rootPrefix>/year=YY/month=MM/day=DD/HHMM_<suffix>.parquet

Épicos

TarefaDescriçãoHabilidades necessárias

Criar um bucket do S3.

  1. Criar um bucket do S3 ou use um bucket existente.

  2. Crie um prefixo significativo para o bucket do S3 em que você deseja ingerir os dados de IoT (por exemplo, s3:\\<bucket>\<prefix>).

  3. Anote o seu prefixo para uso posterior.

Desenvolvedor de aplicativos

Adicione IAM permissões ao bucket do S3.

Para conceder aos usuários acesso de gravação ao bucket e ao prefixo do S3 que você criou anteriormente, adicione a seguinte IAM política à sua função do IoT AWS Greengrass:

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3DataUpload", "Effect": "Allow", "Action": [ "s3:List*", "s3:Put*" ], "Resource": [ "arn:aws:s3:::<ingestionBucket>", "arn:aws:s3:::<ingestionBucket>/<prefix>/*" ] } ] }

Para obter mais informações, consulte Criação de uma IAM política para acessar os recursos do Amazon S3 na documentação do Aurora.

Em seguida, atualize a política de recursos (se necessário) do bucket do S3 para permitir o acesso de gravação com os AWS principais corretos.

Desenvolvedor de aplicativos
TarefaDescriçãoHabilidades necessárias

Atualizar os componentes da fórmula.

Atualize a configuração do componente ao criar uma implantação com base no exemplo a seguir:

{ "region": "<region>", "parquet_period": <period>, "s3_bucket": "<s3Bucket>", "s3_key_prefix": "<s3prefix>" }

<region>Substitua por sua AWS região, <period> por seu intervalo periódico, <s3Bucket> por seu bucket S3 e por seu <s3prefix> prefixo.

Desenvolvedor de aplicativos

Criar o componente.

Execute um destes procedimentos:

  • Criar o componente.

  • Adicione o componente ao pipeline de CI/CD (se houver). Certifique-se de copiar o artefato do repositório de artefatos para o bucket de artefatos do AWS IoT Greengrass. Em seguida, crie ou atualize seu componente AWS do IoT Greengrass.

  • Adicione o MQTT corretor como um componente ou adicione-o manualmente posteriormente. Nota: essa decisão afeta o esquema de autenticação que você pode usar com o corretor. A adição manual de uma corretora separa a corretora do AWS IoT Greengrass e ativa qualquer esquema de autenticação suportado pela corretora. Os componentes do broker AWS fornecidos têm esquemas de autenticação predefinidos. Para obter mais informações, consulte MQTT3.1.1 broker (Moquette) e MQTT5 broker (). EMQX

Desenvolvedor de aplicativos

Atualize o MQTT cliente.

O código de amostra não usa autenticação porque o componente se conecta localmente ao corretor. Se seu cenário for diferente, atualize a seção MQTT do cliente conforme necessário. Além disso, faça o seguinte:

  1. Atualize os MQTT tópicos na assinatura.

  2. Atualize o analisador de MQTT mensagens conforme necessário, pois as mensagens de cada fonte podem ser diferentes.

Desenvolvedor de aplicativos
TarefaDescriçãoHabilidades necessárias

Atualize a implantação do dispositivo principal.

Se a implantação do dispositivo principal do AWS IoT Greengrass versão 2 já existir, revise a implantação. Se a implantação não existir, crie uma nova implantação.

Para dar ao componente o nome correto, atualize a configuração do gerenciador de logs para o novo componente (se necessário) com base no seguinte:

{ "logsUploaderConfiguration": { "systemLogsConfiguration": { ... }, "componentLogsConfigurationMap": { "<com.iot.ingest.parquet>": { "minimumLogLevel": "INFO", "diskSpaceLimit": "20", "diskSpaceLimitUnit": "MB", "deleteLogFileAfterCloudUpload": "false" } ... } }, "periodicUploadIntervalSec": "300" }

Por fim, conclua a revisão da implantação do seu dispositivo principal AWS do IoT Greengrass.

Desenvolvedor de aplicativos
TarefaDescriçãoHabilidades necessárias

Verifique os registros do volume do AWS IoT Greengrass.

Verifique o seguinte:

  • O MQTT cliente foi conectado com sucesso ao MQTT corretor local.

  • O MQTT cliente está inscrito nos tópicos corretos.

  • Mensagens de atualização do sensor estão chegando ao corretor sobre os MQTT tópicos.

  • A compressão do Parquet ocorre em todos os intervalos periódicos.

Desenvolvedor de aplicativos

Verificar o bucket do S3.

Verifique se os dados estão sendo carregados para o bucket do S3. Você pode ver os arquivos sendo enviados em cada período.

Você também pode verificar se os dados foram carregados no bucket do S3 ao consultar os dados na próxima seção.

Desenvolvedor de aplicativos
TarefaDescriçãoHabilidades necessárias

Criar banco de dados e tabela.

  1. Crie um banco de dados AWS Glue (se necessário).

  2. Crie uma tabela no AWS Glue manualmente ou executando um rastreador no AWS Glue.

Desenvolvedor de aplicativos

Conceda ao Athena o acesso aos dados.

  1. Atualize as permissões para permitir que o Athena acesse o bucket do S3. Para obter mais informações, consulte Acesso refinado a bancos de dados e tabelas no AWS Glue Data Catalog na documentação do Athena.

  2. Consulte a tabela em seu banco de dados.

Desenvolvedor de aplicativos

Solução de problemas

ProblemaSolução

MQTTo cliente não consegue se conectar

MQTTo cliente não consegue se inscrever

Valide as permissões no MQTT corretor. Se você tem um MQTT corretor deAWS, consulte MQTT3.1.1 corretor (Moquette) e MQTT5 corretor (). EMQX

Os arquivos Parquet não são criados

  • Verifique se os MQTT tópicos estão corretos.

  • Verifique se as MQTT mensagens dos sensores estão no formato correto.

Os objetos não são carregados no bucket do S3

  • Verifique se você tem conectividade com a Internet e com o endpoint.

  • Verifique se a política de recursos do seu bucket do S3 está correta.

  • Verifique as permissões para a função de dispositivo principal do AWS IoT Greengrass versão 2.

Recursos relacionados

Mais informações

Análise de custos

O cenário de análise de custos a seguir demonstra como a abordagem de ingestão de dados abordada nesse padrão pode impactar os custos de ingestão de dados na nuvem. AWS Os exemplos de preços nesse cenário são baseados nos preços no momento da publicação. Os preços estão sujeitos a alterações. Além disso, seus custos podem variar dependendo da sua AWS região, das cotas de AWS serviço e de outros fatores relacionados ao seu ambiente de nuvem.

Conjunto de sinais de entrada

Essa análise usa o seguinte conjunto de sinais de entrada como base para comparar os custos de ingestão de IoT com outras alternativas disponíveis.

Número de sinais

Frequência

Dados por sinal

125

25 Hz

8 bytes

Nesse cenário, o sistema recebe 125 sinais. Cada sinal tem 8 bytes e ocorre a cada 40 milissegundos (25 Hz). Esses sinais podem vir individualmente ou agrupados em um payload comum. Você tem a opção de dividir e empacotar esses sinais com base em suas necessidades. Você também pode determinar a latência. A latência consiste no período de tempo para receber, acumular e ingerir os dados.

Para fins de comparação, a operação de ingestão para esse cenário é baseada na us-east-1 AWS Região. A comparação de custos se aplica somente aos AWS serviços. Outros custos, como hardware ou conectividade, não são considerados na análise.

Comparações de custos

A tabela a seguir mostra o custo mensal em dólares americanos (USD) para cada método de ingestão.

Método

Custo mensal

AWSIoT * SiteWise

331,77 USD

AWSIoT SiteWise Edge com pacote de processamento de dados (mantendo todos os dados na borda)

200 USD

AWSRegras do IoT Core e do Amazon S3 para acessar dados brutos

84,54 USD

Compressão de arquivos Parquet na borda e upload para o Amazon S3

0,5 USD

*Os dados devem ser reduzidos para cumprir as Service Quotas. Isso significa que há alguma perda de dados com esse método.

Métodos alternativos

Esta seção mostra os custos equivalentes dos seguintes métodos alternativos:

  • AWSIoT SiteWise — Cada sinal deve ser carregado em uma mensagem individual. Portanto, o número total de mensagens por mês é 125 × 25 × 3600 × 24 × 30, ou 8,1 bilhões de mensagens por mês. No entanto, a AWS IoT SiteWise pode lidar com apenas 10 pontos de dados por segundo por propriedade. Supondo que a resolução dos dados seja reduzida para 10 Hz, o número de mensagens por mês é reduzido para 125 × 10 × 3600 × 24 × 30, ou 3,24 bilhões. Se você usar o componente de editor que agrupa as medidas em grupos de 10 (a 1 USD por milhão de mensagens), obterá um custo mensal de 324 USD por mês. Supondo que cada mensagem tenha 8 bytes (1 Kb/125), são 25,92 Gb de armazenamento de dados. Isso adiciona um custo mensal de 7,77 USD por mês. O custo total do primeiro mês é 331,77 USD e aumenta 7,77 USD a cada mês.

  • AWSIoT SiteWise Edge com pacote de processamento de dados, incluindo todos os modelos e sinais totalmente processados na borda (ou seja, sem ingestão na nuvem) — Você pode usar o pacote de processamento de dados como alternativa para reduzir custos e configurar todos os modelos que são calculados na borda. Isso pode funcionar apenas para armazenamento e visualização, mesmo que nenhum cálculo real seja realizado. Nesse caso, é necessário usar um hardware poderoso para o gateway de borda. Há um custo fixo de 200 USD por mês.

  • Ingestão direta para o AWS IoT Core MQTT e uma regra de IoT para armazenar os dados brutos no Amazon S3 — Supondo que todos os sinais sejam publicados em uma carga útil comum, o número total de mensagens publicadas no AWS IoT Core é 25 × 3600 × 24 × 30, ou 64,8 milhões por mês. Com 1 USD por milhão de mensagens, esse é um custo mensal de 64,8 USD por mês. Com 0,15 USD por milhão de ativações de regras e com uma regra por mensagem, isso adiciona um custo mensal de 19,44 USD por mês. Com um custo de 0,023 USD por Gb de armazenamento no Amazon S3, isso adiciona mais USD 1,5 por mês (aumentando a cada mês para refletir os novos dados). O custo total do primeiro mês é de 84,54 USD e aumenta em 1,5 a USD cada mês.

  • Compressão de dados na borda de um arquivo Parquet e upload para o Amazon S3 (método proposto): a taxa de compactação depende do tipo de dados. Com os mesmos dados industriais testadosMQTT, o total de dados de saída de um mês inteiro é de 1,2 Gb. Isso custa 0,03 USD por mês. As taxas de compressão (usando dados aleatórios) descritas em outros benchmarks são da ordem de 66 por cento (mais próximas do pior cenário). O total de dados é de 21 Gb e custa 0,5 USD por mês.

Gerador de arquivos Parquet

O exemplo de código a seguir mostra a estrutura de um gerador de arquivos Parquet escrito em Python. O exemplo de código serve apenas para fins ilustrativos e não funcionará se for colado em seu ambiente.

import queue import paho.mqtt.client as mqtt import pandas as pd #queue for decoupling the MQTT thread messageQueue = queue.Queue() client = mqtt.Client() streammanager = StreamManagerClient() def feederListener(topic, message): payload = { "topic" : topic, "payload" : message, } messageQueue.put_nowait(payload) def on_connect(client_instance, userdata, flags, rc): client.subscribe("#",qos=0) def on_message(client, userdata, message): feederListener(topic=str(message.topic), message=str(message.payload.decode("utf-8"))) filename = "tempfile.parquet" streamname = "mystream" destination_bucket= "amzn-s3-demo-bucket" keyname="mykey" period= 60 client.on_connect = on_connect client.on_message = on_message streammanager.create_message_stream( MessageStreamDefinition(name=streamname, strategy_on_full=StrategyOnFull.OverwriteOldestData) ) while True: try: message = messageQueue.get(timeout=myArgs.mqtt_timeout) except (queue.Empty): logger.warning("MQTT message reception timed out") currentTimestamp = getCurrentTime() if currentTimestamp >= nextUploadTimestamp: df = pd.DataFrame.from_dict(accumulator) df.to_parquet(filename) s3_export_task_definition = S3ExportTaskDefinition(input_url=filename, bucket=destination_bucket, key=key_name) streammanager.append_message(streamname, Util.validate_and_serialize_to_json_bytes(s3_export_task_definition)) accumulator = {} nextUploadTimestamp += period else: accumulator.append(message)