As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Ingira dados de IoT de forma econômica diretamente no Amazon S3 usando o IoT Greengrass AWS
Criado por Sebastian Viviani (AWS) e Rizwan Syed () AWS
Ambiente: PoC ou piloto | Tecnologias: Análise; IoT | Workload: código aberto |
AWSserviços: AWS IoT Greengrass; Amazon S3; Amazon Athena |
Resumo
Esse padrão mostra como ingerir dados da Internet das Coisas (IoT) de forma econômica diretamente em um bucket do Amazon Simple Storage Service (Amazon S3) usando um AWS dispositivo IoT Greengrass versão 2. O dispositivo executa um componente personalizado que lê os dados da IoT e os salva em armazenamento persistente (ou seja, um disco ou volume local). Em seguida, o dispositivo compacta os dados de IoT em um arquivo Apache Parquet e carrega os dados periodicamente em um bucket do S3.
A quantidade e a velocidade dos dados de IoT que você ingere são limitadas apenas pelos recursos de hardware de borda e pela largura de banda da rede. É possível usar o Amazon Athena para analisar de forma econômica os dados ingeridos. O Athena suporta arquivos compactados do Apache Parquet e visualização de dados usando o Amazon Managed Grafana.
Pré-requisitos e limitações
Pré-requisitos
Uma AWS conta ativa
Um gateway de ponta que roda no AWSIoT Greengrass versão 2 e coleta dados de sensores (as fontes de dados e o processo de coleta de dados estão além do escopo desse padrão, mas você pode usar praticamente qualquer tipo de dados de sensor. Esse padrão usa um MQTT
agente local com sensores ou gateways que publicam dados localmente.) AWSComponente , funções e dependências do IoT Greengrass SDK
Um componente do gerenciador de fluxo para carregar os dados no bucket do S3
AWSSDKpara Java
JavaScript, AWSSDKpara ou AWSSDKpara Python (Boto3) para executar o APIs
Limitações
Os dados nesse padrão não são enviados em tempo real para o bucket do S3. Há um período de atraso e você pode configurar esse período. Os dados são armazenados temporariamente no dispositivo de borda e, em seguida, carregados quando o período expira.
O SDK está disponível somente em Java, Node.js e Python.
Arquitetura
Pilha de tecnologias de destino
Amazon S3
AWS IoT Greengrass
MQTTcorretor
Componente gerenciador de fluxo
Arquitetura de destino
O diagrama a seguir mostra uma arquitetura projetada para ingerir dados de sensores de IoT e armazená-los em um bucket do S3.
O diagrama mostra o seguinte fluxo de trabalho:
Várias atualizações de sensores (por exemplo, temperatura e válvula) são publicadas em um MQTT corretor local.
O compressor de arquivos Parquet que está inscrito nesses sensores atualiza os tópicos e recebe essas atualizações.
O compressor de arquivos Parquet armazena as atualizações localmente.
Após o término do período, os arquivos armazenados são compactados em arquivos Parquet e transmitidos ao gerenciador de fluxo para serem carregados no bucket do S3 especificado.
O gerenciador de fluxo carrega os arquivos Parquet para o bucket do S3.
Nota: o gerenciador de fluxo (StreamManager
) é um componente gerenciado. Para exemplos de como exportar dados para o Amazon S3, consulte Stream manager na documentação do IoT AWS Greengrass. Você pode usar um MQTT corretor local como componente ou outro corretor como o Eclipse Mosquitto
Ferramentas
AWSferramentas
O Amazon Athena é um serviço de consulta interativo que ajuda você a analisar dados diretamente no Amazon S3 usando o padrão. SQL
O Amazon Simple Storage Service (Amazon S3) é um serviço de armazenamento de objetos baseado na nuvem que ajuda você a armazenar, proteger e recuperar qualquer quantidade de dados.
AWSO IoT Greengrass é um serviço de nuvem e tempo de execução de ponta de IoT de código aberto que ajuda você a criar, implantar e gerenciar aplicativos de IoT em seus dispositivos.
Outras ferramentas
O Apache Parquet
é um formato de arquivos de dados orientados por colunas de código aberto projetado para armazenamento e recuperação. MQTT(Message Queuing Telemetry Transport) é um protocolo de mensagens leve projetado para dispositivos restritos.
Práticas recomendadas
Use o formato de partição correto para dados carregados
Não há requisitos específicos para os nomes do prefixo raiz no bucket do S3 (por exemplo, "myAwesomeDataSet/"
ou"dataFromSource"
), mas recomendamos que você use uma partição e um prefixo significativos para facilitar a compreensão da finalidade do conjunto de dados.
Também recomendamos que você use o particionamento correto no Amazon S3 para que as consultas sejam executadas de maneira ideal no conjunto de dados. No exemplo a seguir, os dados são particionados em HIVE formato para que a quantidade de dados digitalizados por cada consulta do Athena seja otimizada. Isso melhora o desempenho e reduz os custos.
s3://<ingestionBucket>/<rootPrefix>/year=YY/month=MM/day=DD/HHMM_<suffix>.parquet
Épicos
Tarefa | Descrição | Habilidades necessárias |
---|---|---|
Criar um bucket do S3. |
| Desenvolvedor de aplicativos |
Adicione IAM permissões ao bucket do S3. | Para conceder aos usuários acesso de gravação ao bucket e ao prefixo do S3 que você criou anteriormente, adicione a seguinte IAM política à sua função do IoT AWS Greengrass:
Para obter mais informações, consulte Criação de uma IAM política para acessar os recursos do Amazon S3 na documentação do Aurora. Em seguida, atualize a política de recursos (se necessário) do bucket do S3 para permitir o acesso de gravação com os AWS principais corretos. | Desenvolvedor de aplicativos |
Tarefa | Descrição | Habilidades necessárias |
---|---|---|
Atualizar os componentes da fórmula. | Atualize a configuração do componente ao criar uma implantação com base no exemplo a seguir:
| Desenvolvedor de aplicativos |
Criar o componente. | Execute um destes procedimentos:
| Desenvolvedor de aplicativos |
Atualize o MQTT cliente. | O código de amostra não usa autenticação porque o componente se conecta localmente ao corretor. Se seu cenário for diferente, atualize a seção MQTT do cliente conforme necessário. Além disso, faça o seguinte:
| Desenvolvedor de aplicativos |
Tarefa | Descrição | Habilidades necessárias |
---|---|---|
Atualize a implantação do dispositivo principal. | Se a implantação do dispositivo principal do AWS IoT Greengrass versão 2 já existir, revise a implantação. Se a implantação não existir, crie uma nova implantação. Para dar ao componente o nome correto, atualize a configuração do gerenciador de logs para o novo componente (se necessário) com base no seguinte:
Por fim, conclua a revisão da implantação do seu dispositivo principal AWS do IoT Greengrass. | Desenvolvedor de aplicativos |
Tarefa | Descrição | Habilidades necessárias |
---|---|---|
Verifique os registros do volume do AWS IoT Greengrass. | Verifique o seguinte:
| Desenvolvedor de aplicativos |
Verificar o bucket do S3. | Verifique se os dados estão sendo carregados para o bucket do S3. Você pode ver os arquivos sendo enviados em cada período. Você também pode verificar se os dados foram carregados no bucket do S3 ao consultar os dados na próxima seção. | Desenvolvedor de aplicativos |
Tarefa | Descrição | Habilidades necessárias |
---|---|---|
Criar banco de dados e tabela. |
| Desenvolvedor de aplicativos |
Conceda ao Athena o acesso aos dados. |
| Desenvolvedor de aplicativos |
Solução de problemas
Problema | Solução |
---|---|
MQTTo cliente não consegue se conectar |
|
MQTTo cliente não consegue se inscrever | Valide as permissões no MQTT corretor. Se você tem um MQTT corretor deAWS, consulte MQTT3.1.1 corretor (Moquette) e MQTT5 corretor (). EMQX |
Os arquivos Parquet não são criados |
|
Os objetos não são carregados no bucket do S3 |
|
Recursos relacionados
DataFrame
(Documentação do Pandas) Documentação do Apache Parquet
(documentação do Parquet) Desenvolva componentes AWS do IoT Greengrass (Guia do desenvolvedor do IoT AWS Greengrass, versão 2)
Implante componentes AWS do IoT Greengrass em dispositivos (Guia do desenvolvedor do IoT AWS Greengrass, versão 2)
Interaja com dispositivos locais de IoT (Guia do Desenvolvedor do IoT AWS Greengrass, versão 2)
MQTT3.1.1 broker (Moquette) (Guia do desenvolvedor do AWS IoT Greengrass, versão 2)
MQTT5 broker (EMQX) (Guia do desenvolvedor AWS do IoT Greengrass, versão 2)
Mais informações
Análise de custos
O cenário de análise de custos a seguir demonstra como a abordagem de ingestão de dados abordada nesse padrão pode impactar os custos de ingestão de dados na nuvem. AWS Os exemplos de preços nesse cenário são baseados nos preços no momento da publicação. Os preços estão sujeitos a alterações. Além disso, seus custos podem variar dependendo da sua AWS região, das cotas de AWS serviço e de outros fatores relacionados ao seu ambiente de nuvem.
Conjunto de sinais de entrada
Essa análise usa o seguinte conjunto de sinais de entrada como base para comparar os custos de ingestão de IoT com outras alternativas disponíveis.
Número de sinais | Frequência | Dados por sinal |
125 | 25 Hz | 8 bytes |
Nesse cenário, o sistema recebe 125 sinais. Cada sinal tem 8 bytes e ocorre a cada 40 milissegundos (25 Hz). Esses sinais podem vir individualmente ou agrupados em um payload comum. Você tem a opção de dividir e empacotar esses sinais com base em suas necessidades. Você também pode determinar a latência. A latência consiste no período de tempo para receber, acumular e ingerir os dados.
Para fins de comparação, a operação de ingestão para esse cenário é baseada na us-east-1
AWS Região. A comparação de custos se aplica somente aos AWS serviços. Outros custos, como hardware ou conectividade, não são considerados na análise.
Comparações de custos
A tabela a seguir mostra o custo mensal em dólares americanos (USD) para cada método de ingestão.
Método | Custo mensal |
AWSIoT * SiteWise | 331,77 USD |
AWSIoT SiteWise Edge com pacote de processamento de dados (mantendo todos os dados na borda) | 200 USD |
AWSRegras do IoT Core e do Amazon S3 para acessar dados brutos | 84,54 USD |
Compressão de arquivos Parquet na borda e upload para o Amazon S3 | 0,5 USD |
*Os dados devem ser reduzidos para cumprir as Service Quotas. Isso significa que há alguma perda de dados com esse método.
Métodos alternativos
Esta seção mostra os custos equivalentes dos seguintes métodos alternativos:
AWSIoT SiteWise — Cada sinal deve ser carregado em uma mensagem individual. Portanto, o número total de mensagens por mês é 125 × 25 × 3600 × 24 × 30, ou 8,1 bilhões de mensagens por mês. No entanto, a AWS IoT SiteWise pode lidar com apenas 10 pontos de dados por segundo por propriedade. Supondo que a resolução dos dados seja reduzida para 10 Hz, o número de mensagens por mês é reduzido para 125 × 10 × 3600 × 24 × 30, ou 3,24 bilhões. Se você usar o componente de editor que agrupa as medidas em grupos de 10 (a 1 USD por milhão de mensagens), obterá um custo mensal de 324 USD por mês. Supondo que cada mensagem tenha 8 bytes (1 Kb/125), são 25,92 Gb de armazenamento de dados. Isso adiciona um custo mensal de 7,77 USD por mês. O custo total do primeiro mês é 331,77 USD e aumenta 7,77 USD a cada mês.
AWSIoT SiteWise Edge com pacote de processamento de dados, incluindo todos os modelos e sinais totalmente processados na borda (ou seja, sem ingestão na nuvem) — Você pode usar o pacote de processamento de dados como alternativa para reduzir custos e configurar todos os modelos que são calculados na borda. Isso pode funcionar apenas para armazenamento e visualização, mesmo que nenhum cálculo real seja realizado. Nesse caso, é necessário usar um hardware poderoso para o gateway de borda. Há um custo fixo de 200 USD por mês.
Ingestão direta para o AWS IoT Core MQTT e uma regra de IoT para armazenar os dados brutos no Amazon S3 — Supondo que todos os sinais sejam publicados em uma carga útil comum, o número total de mensagens publicadas no AWS IoT Core é 25 × 3600 × 24 × 30, ou 64,8 milhões por mês. Com 1 USD por milhão de mensagens, esse é um custo mensal de 64,8 USD por mês. Com 0,15 USD por milhão de ativações de regras e com uma regra por mensagem, isso adiciona um custo mensal de 19,44 USD por mês. Com um custo de 0,023 USD por Gb de armazenamento no Amazon S3, isso adiciona mais USD 1,5 por mês (aumentando a cada mês para refletir os novos dados). O custo total do primeiro mês é de 84,54 USD e aumenta em 1,5 a USD cada mês.
Compressão de dados na borda de um arquivo Parquet e upload para o Amazon S3 (método proposto): a taxa de compactação depende do tipo de dados. Com os mesmos dados industriais testadosMQTT, o total de dados de saída de um mês inteiro é de 1,2 Gb. Isso custa 0,03 USD por mês. As taxas de compressão (usando dados aleatórios) descritas em outros benchmarks são da ordem de 66 por cento (mais próximas do pior cenário). O total de dados é de 21 Gb e custa 0,5 USD por mês.
Gerador de arquivos Parquet
O exemplo de código a seguir mostra a estrutura de um gerador de arquivos Parquet escrito em Python. O exemplo de código serve apenas para fins ilustrativos e não funcionará se for colado em seu ambiente.
import queue import paho.mqtt.client as mqtt import pandas as pd #queue for decoupling the MQTT thread messageQueue = queue.Queue() client = mqtt.Client() streammanager = StreamManagerClient() def feederListener(topic, message): payload = { "topic" : topic, "payload" : message, } messageQueue.put_nowait(payload) def on_connect(client_instance, userdata, flags, rc): client.subscribe("#",qos=0) def on_message(client, userdata, message): feederListener(topic=str(message.topic), message=str(message.payload.decode("utf-8"))) filename = "tempfile.parquet" streamname = "mystream" destination_bucket= "amzn-s3-demo-bucket" keyname="mykey" period= 60 client.on_connect = on_connect client.on_message = on_message streammanager.create_message_stream( MessageStreamDefinition(name=streamname, strategy_on_full=StrategyOnFull.OverwriteOldestData) ) while True: try: message = messageQueue.get(timeout=myArgs.mqtt_timeout) except (queue.Empty): logger.warning("MQTT message reception timed out") currentTimestamp = getCurrentTime() if currentTimestamp >= nextUploadTimestamp: df = pd.DataFrame.from_dict(accumulator) df.to_parquet(filename) s3_export_task_definition = S3ExportTaskDefinition(input_url=filename, bucket=destination_bucket, key=key_name) streammanager.append_message(streamname, Util.validate_and_serialize_to_json_bytes(s3_export_task_definition)) accumulator = {} nextUploadTimestamp += period else: accumulator.append(message)