Ingérez de manière rentable des données IoT directement dans Amazon S3 à l'aide d'IoT Greengrass AWS - Recommandations AWS

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Ingérez de manière rentable des données IoT directement dans Amazon S3 à l'aide d'IoT Greengrass AWS

Créé par Sebastian Viviani (AWS) et Rizwan Syed () AWS

Environnement : PoC ou pilote

Technologies : Analytique ; IoT

Charge de travail : Open source

AWSservices : AWS IoT Greengrass ; Amazon S3 ; Amazon Athena

Récapitulatif

Ce modèle vous montre comment ingérer de manière rentable des données de l'Internet des objets (IoT) directement dans un bucket Amazon Simple Storage Service (Amazon S3) à l'aide d'un appareil IoT AWS Greengrass version 2. L'appareil exécute un composant personnalisé qui lit les données IoT et les enregistre dans un stockage persistant (c'est-à-dire un disque ou un volume local). L'appareil compresse ensuite les données IoT dans un fichier Apache Parquet et les télécharge périodiquement dans un compartiment S3.

La quantité et la vitesse des données IoT que vous ingérez ne sont limitées que par les capacités de votre matériel de pointe et la bande passante de votre réseau. Vous pouvez utiliser Amazon Athena pour analyser de manière rentable les données ingérées. Athena prend en charge les fichiers Apache Parquet compressés et la visualisation des données à l'aide d'Amazon Managed Grafana.

Conditions préalables et limitations

Prérequis

Limites

  • Les données de ce modèle ne sont pas téléchargées en temps réel dans le compartiment S3. Il existe un délai, que vous pouvez configurer. Les données sont temporairement mises en mémoire tampon dans le périphérique périphérique, puis téléchargées une fois la période expirée.

  • Il n'SDKest disponible qu'en Java, Node.js et Python.

Architecture

Pile technologique cible

  • Amazon S3

  • AWS IoT Greengrass

  • MQTTcourtier

  • Composant du gestionnaire de flux

Architecture cible

Le schéma suivant montre une architecture conçue pour ingérer les données des capteurs IoT et les stocker dans un compartiment S3.

Diagramme d'architecture

Le schéma suivant illustre le flux de travail suivant :

  1. Les mises à jour de plusieurs capteurs (par exemple, la température et les vannes) sont publiées auprès d'un MQTT courtier local.

  2. Le compresseur de fichiers Parquet abonné à ces capteurs met à jour les rubriques et reçoit ces mises à jour.

  3. Le compresseur de fichiers Parquet stocke les mises à jour localement.

  4. Une fois la période écoulée, les fichiers stockés sont compressés dans des fichiers Parquet et transmis au gestionnaire de flux pour être téléchargés dans le compartiment S3 spécifié.

  5. Le gestionnaire de flux télécharge les fichiers Parquet dans le compartiment S3.

Remarque : Le gestionnaire de flux (StreamManager) est un composant géré. Pour des exemples d'exportation de données vers Amazon S3, consultez Stream manager dans la documentation AWS IoT Greengrass. Vous pouvez utiliser un MQTT courtier local comme composant ou un autre courtier comme Eclipse Mosquitto.

Outils

AWSoutils

  • Amazon Athena est un service de requête interactif qui vous permet d'analyser les données directement dans Amazon S3 en utilisant la norme. SQL

  • Amazon Simple Storage Service (Amazon S3) est un service de stockage d'objets basé sur le cloud qui vous permet de stocker, de protéger et de récupérer n'importe quel volume de données.

  • AWSIoT Greengrass est un environnement d'exécution IoT Edge et un service cloud open source qui vous aident à créer, déployer et gérer des applications IoT sur vos appareils.

Autres outils

  • Apache Parquet est un format de fichier de données open source orienté colonne conçu pour le stockage et la récupération.

  • MQTT(Message Queuing Telemetry Transport) est un protocole de messagerie léger conçu pour les appareils limités.

Bonnes pratiques

Utilisez le bon format de partition pour les données téléchargées

Il n'existe aucune exigence spécifique concernant les noms de préfixes racines dans le compartiment S3 (par exemple, "myAwesomeDataSet/" ou"dataFromSource"), mais nous vous recommandons d'utiliser une partition et un préfixe significatifs afin de comprendre facilement l'objectif de l'ensemble de données.

Nous vous recommandons également d'utiliser le bon partitionnement dans Amazon S3 afin que les requêtes s'exécutent de manière optimale sur l'ensemble de données. Dans l'exemple suivant, les données sont partitionnées au HIVE format de manière à optimiser la quantité de données numérisées par chaque requête Athena. Cela améliore les performances et réduit les coûts.

s3://<ingestionBucket>/<rootPrefix>/year=YY/month=MM/day=DD/HHMM_<suffix>.parquet

Épopées

TâcheDescriptionCompétences requises

Créez un compartiment S3.

  1. Créez un compartiment S3 ou utilisez un compartiment existant.

  2. Créez un préfixe significatif pour le compartiment S3 dans lequel vous souhaitez ingérer les données IoT (par exemple,s3:\\<bucket>\<prefix>).

  3. Enregistrez votre préfixe pour une utilisation ultérieure.

Développeur d'applications

Ajoutez IAM des autorisations au compartiment S3.

Pour accorder aux utilisateurs un accès en écriture au compartiment S3 et au préfixe que vous avez créés précédemment, ajoutez la IAM politique suivante à votre rôle AWS IoT Greengrass :

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3DataUpload", "Effect": "Allow", "Action": [ "s3:List*", "s3:Put*" ], "Resource": [ "arn:aws:s3:::<ingestionBucket>", "arn:aws:s3:::<ingestionBucket>/<prefix>/*" ] } ] }

Pour plus d'informations, consultez la section Création d'une IAM politique d'accès aux ressources Amazon S3 dans la documentation Aurora.

Ensuite, mettez à jour la politique de ressources (si nécessaire) pour le compartiment S3 afin d'autoriser l'accès en écriture avec AWS les principes appropriés.

Développeur d'applications
TâcheDescriptionCompétences requises

Mettez à jour la recette du composant.

Mettez à jour la configuration des composants lorsque vous créez un déploiement en vous basant sur l'exemple suivant :

{ "region": "<region>", "parquet_period": <period>, "s3_bucket": "<s3Bucket>", "s3_key_prefix": "<s3prefix>" }

<region>Remplacez-le par votre AWS région, <period> par votre intervalle périodique, <s3Bucket> par votre compartiment S3 et <s3prefix> par votre préfixe.

Développeur d'applications

Créez le composant.

Effectuez l’une des actions suivantes :

  • Créez le composant.

  • Ajoutez le composant au pipeline CI/CD (s'il en existe un). Veillez à copier l'artefact depuis le référentiel d'artefacts vers le bucket d'artefacts IoT AWS Greengrass. Créez ou mettez à jour votre composant AWS IoT Greengrass.

  • Ajoutez le MQTT broker en tant que composant ou ajoutez-le manuellement ultérieurement. Remarque : Cette décision affecte le schéma d'authentification que vous pouvez utiliser avec le courtier. L'ajout manuel d'un courtier dissocie le courtier d'IoT AWS Greengrass et active tout schéma d'authentification pris en charge par le courtier. Les composants du courtier AWS fournis ont des schémas d'authentification prédéfinis. Pour plus d'informations, consultez MQTT3.1.1 broker (Moquette) et MQTT5 broker (EMQX).

Développeur d'applications

Mettez à jour le MQTT client.

L'exemple de code n'utilise pas l'authentification car le composant se connecte localement au courtier. Si votre scénario est différent, mettez à jour la section MQTT client selon vos besoins. Procédez également comme suit :

  1. Mettez à jour les MQTT rubriques de l'abonnement.

  2. Mettez à jour l'analyseur de MQTT messages selon les besoins, car les messages provenant de chaque source peuvent être différents.

Développeur d'applications
TâcheDescriptionCompétences requises

Mettez à jour le déploiement du périphérique principal.

Si le déploiement du dispositif principal AWS IoT Greengrass version 2 existe déjà, revoyez le déploiement. Si le déploiement n'existe pas, créez-en un nouveau.

Pour attribuer le nom correct au composant, mettez à jour la configuration du gestionnaire de journaux pour le nouveau composant (si nécessaire) en fonction des éléments suivants :

{ "logsUploaderConfiguration": { "systemLogsConfiguration": { ... }, "componentLogsConfigurationMap": { "<com.iot.ingest.parquet>": { "minimumLogLevel": "INFO", "diskSpaceLimit": "20", "diskSpaceLimitUnit": "MB", "deleteLogFileAfterCloudUpload": "false" } ... } }, "periodicUploadIntervalSec": "300" }

Enfin, terminez la révision du déploiement de votre appareil AWS IoT Greengrass principal.

Développeur d'applications
TâcheDescriptionCompétences requises

Consultez les journaux pour le volume AWS IoT Greengrass.

Vérifiez les points suivants :

  • Le MQTT client est correctement connecté au MQTT courtier local.

  • Le MQTT client est abonné aux bons sujets.

  • Des messages de mise à jour des capteurs sont envoyés au courtier à ce MQTT sujet.

  • La compression du parquet se produit à chaque intervalle périodique.

Développeur d'applications

Vérifiez le compartiment S3.

Vérifiez si les données sont téléchargées dans le compartiment S3. Vous pouvez voir les fichiers téléchargés à chaque période.

Vous pouvez également vérifier si les données sont téléchargées dans le compartiment S3 en interrogeant les données dans la section suivante.

Développeur d'applications
TâcheDescriptionCompétences requises

Créez une base de données et une table.

  1. Créez une base de données AWS Glue (si nécessaire).

  2. Créez un tableau dans AWS Glue manuellement ou en exécutant un crawler dans AWS Glue.

Développeur d'applications

Accordez à Athéna l'accès aux données.

  1. Mettez à jour les autorisations pour permettre à Athena d'accéder au compartiment S3. Pour plus d'informations, consultez la section Accès détaillé aux bases de données et aux tables dans le catalogue de données AWS Glue de la documentation Athena.

  2. Interrogez la table de votre base de données.

Développeur d'applications

Résolution des problèmes

ProblèmeSolution

MQTTle client ne parvient pas à se connecter

MQTTle client ne parvient pas à s'abonner

Validez les autorisations sur le MQTT courtier. Si vous avez un MQTT courtierAWS, voir MQTT3.1.1 broker (Moquette) et MQTT5 broker (EMQX).

Les fichiers de parquet ne sont pas créés

  • Vérifiez que les MQTT sujets sont corrects.

  • Vérifiez que le format MQTT des messages provenant des capteurs est correct.

Les objets ne sont pas chargés dans le compartiment S3

  • Vérifiez que vous disposez d'une connectivité Internet et d'une connectivité aux terminaux.

  • Vérifiez que la politique de ressources de votre compartiment S3 est correcte.

  • Vérifiez les autorisations pour le rôle principal de l'appareil AWS IoT Greengrass version 2.

Ressources connexes

Informations supplémentaires

Analyse des coûts

Le scénario d'analyse des coûts suivant montre comment l'approche d'ingestion de données couverte par ce modèle peut avoir un impact sur les coûts d'ingestion de données dans le AWS cloud. Les exemples de tarification de ce scénario sont basés sur les prix au moment de la publication. Les prix sont susceptibles d’être modifiés. En outre, vos coûts peuvent varier en fonction de votre AWS région, des quotas de AWS service et d'autres facteurs liés à votre environnement cloud.

Set de signaux d'entrée

Cette analyse utilise l'ensemble de signaux d'entrée suivant comme base pour comparer les coûts d'ingestion de l'IoT avec les autres alternatives disponibles.

Nombre de signaux

Frequency (Fréquence)

Données par signal

125

25 Hz

8 bytes

Dans ce scénario, le système reçoit 125 signaux. Chaque signal est de 8 octets et se produit toutes les 40 millisecondes (25 Hz). Ces signaux peuvent être fournis individuellement ou regroupés dans une charge utile commune. Vous avez la possibilité de diviser et de regrouper ces signaux en fonction de vos besoins. Vous pouvez également déterminer le temps de latence. La latence correspond à la période pendant laquelle les données sont reçues, accumulées et ingérées.

À des fins de comparaison, l'opération d'ingestion pour ce scénario est basée dans la us-east-1 AWS Région. La comparaison des coûts s'applique uniquement AWS aux services. Les autres coûts, tels que le matériel ou la connectivité, ne sont pas pris en compte dans l'analyse.

Comparaisons de coûts

Le tableau suivant indique le coût mensuel en dollars américains (USD) pour chaque méthode d'ingestion.

Method

Coût mensuel

AWSIoT SiteWise *

331,77 USD

AWSIoT SiteWise Edge avec pack de traitement des données (maintien de toutes les données à la périphérie)

200 USD

AWSRègles IoT Core et Amazon S3 pour accéder aux données brutes

84,54 USD

Compression de fichiers Parquet à la périphérie et téléchargement vers Amazon S3

0,5 USD

*Les données doivent être sous-échantillonnées pour respecter les quotas de service. Cela signifie qu'il y a une certaine perte de données avec cette méthode.

Méthodes alternatives

Cette section indique les coûts équivalents pour les méthodes alternatives suivantes :

  • AWSIoT SiteWise — Chaque signal doit être transféré dans un message individuel. Par conséquent, le nombre total de messages par mois est de 125 × 25 × 3600 × 24 × 30, soit 8,1 milliards de messages par mois. Cependant, AWS l'IoT ne SiteWise peut gérer que 10 points de données par seconde et par propriété. En supposant que les données soient sous-échantillonnées à 10 Hz, le nombre de messages par mois est réduit à 125 × 10 × 3600 × 24 × 30, soit 3,24 milliards. Si vous utilisez le composant éditeur qui regroupe les mesures par groupes de 10 (à raison de 1 USD par million de messages), vous obtenez un coût mensuel de 324 USD par mois. En supposant que chaque message est de 8 octets (1 Kb/125), cela représente 25,92 Go de stockage de données. Cela ajoute un coût mensuel de 7,77€ USD par mois. Le coût total pour le premier mois est de 331,77€ USD et augmente de 7,77€ chaque USD mois.

  • AWSIoT SiteWise Edge avec pack de traitement des données, incluant tous les modèles et signaux entièrement traités à la périphérie (c'est-à-dire sans ingestion du cloud) : vous pouvez utiliser le pack de traitement des données comme alternative pour réduire les coûts et configurer tous les modèles calculés à la périphérie. Cela peut fonctionner uniquement pour le stockage et la visualisation, même si aucun calcul réel n'est effectué. Dans ce cas, il est nécessaire d'utiliser un matériel puissant pour la passerelle Edge. Il y a un coût fixe de 200€ USD par mois.

  • Intégration directe à AWS IoT Core par MQTT une règle IoT pour stocker les données brutes dans Amazon S3 — En supposant que tous les signaux soient publiés dans une charge utile commune, le nombre total de messages publiés sur AWS IoT Core est de 25 × 3600 × 24 × 30, soit 64,8 millions par mois. À 1 USD par million de messages, cela représente un coût mensuel de 64,8€ USD par mois. À 0,15 USD par million d'activations de règles et à raison d'une règle par message, cela ajoute un coût mensuel de 19,44€ USD par mois. Au coût de 0,023 USD par Go de stockage dans Amazon S3, cela représente 1,5 USD % de plus par mois (augmentation mensuelle pour refléter les nouvelles données). Le coût total pour le premier mois est de 84,54€ USD et augmente de 1,5 % USD chaque mois.

  • Compression des données en périphérie dans un fichier Parquet et chargement vers Amazon S3 (méthode proposée) : le taux de compression dépend du type de données. Avec les mêmes données industrielles testéesMQTT, le total des données de sortie pour un mois complet est de 1,2 Go. Cela coûte 0,03€ USD par mois. Les taux de compression (basés sur des données aléatoires) décrits dans d'autres benchmarks sont de l'ordre de 66 % (ce qui est plus proche du pire des scénarios). Le total des données est de 21 Go et coûte 0,5 USD par mois.

Générateur de fichiers pour parquet

L'exemple de code suivant montre la structure d'un générateur de fichiers Parquet écrit en Python. L'exemple de code est fourni à titre d'illustration uniquement et ne fonctionnera pas s'il est collé dans votre environnement.

import queue import paho.mqtt.client as mqtt import pandas as pd #queue for decoupling the MQTT thread messageQueue = queue.Queue() client = mqtt.Client() streammanager = StreamManagerClient() def feederListener(topic, message): payload = { "topic" : topic, "payload" : message, } messageQueue.put_nowait(payload) def on_connect(client_instance, userdata, flags, rc): client.subscribe("#",qos=0) def on_message(client, userdata, message): feederListener(topic=str(message.topic), message=str(message.payload.decode("utf-8"))) filename = "tempfile.parquet" streamname = "mystream" destination_bucket= "amzn-s3-demo-bucket" keyname="mykey" period= 60 client.on_connect = on_connect client.on_message = on_message streammanager.create_message_stream( MessageStreamDefinition(name=streamname, strategy_on_full=StrategyOnFull.OverwriteOldestData) ) while True: try: message = messageQueue.get(timeout=myArgs.mqtt_timeout) except (queue.Empty): logger.warning("MQTT message reception timed out") currentTimestamp = getCurrentTime() if currentTimestamp >= nextUploadTimestamp: df = pd.DataFrame.from_dict(accumulator) df.to_parquet(filename) s3_export_task_definition = S3ExportTaskDefinition(input_url=filename, bucket=destination_bucket, key=key_name) streammanager.append_message(streamname, Util.validate_and_serialize_to_json_bytes(s3_export_task_definition)) accumulator = {} nextUploadTimestamp += period else: accumulator.append(message)