IoT Greengrass를 사용하여 AWSIoT 데이터를 Amazon S3에 비용 효율적으로 직접 수집 - AWS 권장 가이드

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

IoT Greengrass를 사용하여 AWSIoT 데이터를 Amazon S3에 비용 효율적으로 직접 수집

작성자: Sebastian Viviani(AWS) 및 Rizwan Syed(AWS)

환경: PoC 또는 파일럿

기술: 분석, IoT

워크로드: 오픈 소스

AWS 서비스: AWS IoT Greengrass, Amazon S3, Amazon Athena

요약

이 패턴은 IoT AWS IoT Greengrass 버전 2 디바이스를 사용하여 사물 인터넷(IoT) 데이터를 Amazon Simple Storage Service(Amazon S3) 버킷에 비용 효율적으로 직접 수집하는 방법을 보여줍니다. 기기는 IoT 데이터를 읽고 영구 리포지토리(즉, 로컬 디스크 또는 볼륨)에 데이터를 저장하는 사용자 지정 구성 요소를 실행합니다. 그런 다음 디바이스는 IoT 데이터를 Apache Parquet 파일로 압축하고 주기적으로 데이터를 S3 버킷에 업로드합니다.

수집하는 IoT 데이터의 양과 속도는 엣지 하드웨어 기능과 네트워크 대역폭에 의해서만 제한됩니다. Amazon Athena를 사용하면 수집된 데이터를 비용 효율적으로 분석할 수 있습니다. Athena는 Amazon Managed Grafana를 사용하여 압축된 Apache Parquet 파일 및 데이터 시각화를 지원합니다.

사전 조건 및 제한 사항

사전 조건 

제한 사항

  • 이 패턴의 데이터는 S3 버킷에 실시간으로 업로드되지 않습니다. 지연 기간이 있으며 지연 기간을 구성할 수 있습니다. 데이터는 에지 디바이스에서 일시적으로 버퍼링된 후 기간이 만료되면 업로드됩니다.

  • SDK 는 Java, Node.js 및 Python에서만 사용할 수 있습니다.

아키텍처

대상 기술 스택 

  • Amazon S3

  • AWS IoT Greengrass

  • MQTT 브로커

  • 스트림 매니저 컴포넌트

대상 아키텍처

다음 다이어그램은 IoT 센서 데이터를 수집하고 해당 데이터를 S3 버킷에 저장하도록 설계된 아키텍처를 보여줍니다.

아키텍처 다이어그램

이 다이어그램은 다음 워크플로를 보여줍니다.

  1. 여러 센서(예: 온도 및 밸브) 업데이트가 로컬 MQTT브로커에 게시됩니다.

  2. 이러한 센서를 구독하는 Parquet 파일 압축기는 주제를 업데이트하고 업데이트를 수신합니다.

  3. Parquet 파일 압축기는 업데이트를 로컬에 저장합니다.

  4. 기간이 경과하면 저장된 파일이 Parquet 파일로 압축되고 스트림 관리자로 전달되어 지정된 S3 버킷에 업로드됩니다.

  5. 스트림 관리자는 ParQuet 파일을 S3 버킷에 업로드합니다.

참고: 스트림 관리자(StreamManager)는 관리되는 구성 요소입니다. Amazon S3로 데이터를 내보내는 방법의 예는 AWSIoT Greengrass 설명서의 스트림 관리자를 참조하세요. 로컬 MQTT브로커를 구성 요소 또는 Eclipse Mosquitto와 같은 다른 브로커로 사용할 수 있습니다.

도구

AWS 도구

  • Amazon Athena는 표준 를 사용하여 Amazon S3에서 직접 데이터를 분석하는 데 도움이 되는 대화형 쿼리 서비스입니다SQL.

  • Amazon Simple Storage Service(S3)는 원하는 양의 데이터를 저장, 보호 및 검색하는 데 도움이 되는 클라우드 기반 객체 스토리지 서비스입니다.

  • AWS IoT Greengrass는 디바이스에서 IoT 애플리케이션을 구축, 배포 및 관리하는 데 도움이 되는 오픈 소스 IoT 엣지 런타임 및 클라우드 서비스입니다.

기타 도구

  • Apache Parquet는 스토리지 및 검색을 위해 설계된 오픈 소스 열 지향 데이터 파일 형식입니다.

  • MQTT (Message Queuing Telemetry Transport)는 제한된 디바이스를 위해 설계된 경량 메시징 프로토콜입니다.

모범 사례

업로드된 데이터에 적합한 파티션 형식 사용

S3 버킷의 루트 접두사 이름(예: "myAwesomeDataSet/" 또는"dataFromSource")에 대한 특정 요구 사항은 없지만 데이터 세트의 용도를 쉽게 이해할 수 있도록 의미 있는 파티션과 접두사를 사용하는 것이 좋습니다.

또한 쿼리가 데이터 세트에서 최적으로 실행되도록 Amazon S3에서 올바른 파티셔닝을 사용하는 것이 좋습니다. 다음 예제에서는 각 Athena 쿼리에서 스캔한 데이터의 양이 최적화되도록 데이터가 HIVE 형식으로 분할됩니다. 이렇게 하면 성능을 개선하고 비용을 절감할 수 있습니다.

s3://<ingestionBucket>/<rootPrefix>/year=YY/month=MM/day=DD/HHMM_<suffix>.parquet

에픽

작업설명필요한 기술

S3 버킷을 생성합니다.

  1. S3 버킷을 생성하거나 기존 버킷을 사용합니다.

  2. IoT 데이터를 수집하려는 S3 버킷에 대해 의미 있는 접두사를 생성합니다(예: s3:\\<bucket>\<prefix>).

  3. 나중에 사용할 수 있도록 접두사를 기록합니다.

앱 개발자

S3 버킷에 IAM 권한을 추가합니다.

사용자에게 이전에 생성한 S3 버킷 및 접두사에 대한 쓰기 액세스 권한을 부여하려면 AWSIoT Greengrass 역할에 다음 IAM 정책을 추가합니다.

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3DataUpload", "Effect": "Allow", "Action": [ "s3:List*", "s3:Put*" ], "Resource": [ "arn:aws:s3:::<ingestionBucket>", "arn:aws:s3:::<ingestionBucket>/<prefix>/*" ] } ] }

자세한 내용은 Aurora 설명서의 Amazon S3 리소스에 액세스하기 위한 IAM 정책 생성을 참조하세요.

그런 다음 올바른 AWS 보안 주체 를 사용하여 쓰기 액세스를 허용하도록 S3 버킷에 대한 리소스 정책(필요한 경우)을 업데이트합니다. https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-bucket-user-policy-specifying-principal-intro.html

앱 개발자
작업설명필요한 기술

구성 요소의 레시피를 업데이트합니다.

다음 예제를 기반으로 배포를 생성할 때 구성 요소 구성을 업데이트합니다.

{ "region": "<region>", "parquet_period": <period>, "s3_bucket": "<s3Bucket>", "s3_key_prefix": "<s3prefix>" }

<region> 를 AWS 리전, <period> 를 주기적 간격, <s3Bucket> 를 S3 버킷, 를 접두사<s3prefix>로 바꿉니다.

앱 개발자

구성 요소를 생성합니다.

다음 중 하나를 수행합니다.

  • 구성 요소를 생성합니다.

  • CI/CD 파이프라인(있는 경우)에 구성 요소를 추가합니다. 아티팩트 리포지토리에서 AWS IoT Greengrass 아티팩트 버킷으로 아티팩트를 복사해야 합니다. 그런 다음 AWS IoT Greengrass 구성 요소를 생성하거나 업데이트합니다.

  • MQTT 브로커를 구성 요소로 추가하거나 나중에 수동으로 추가합니다. 참고: 이 결정은 브로커와 함께 사용할 수 있는 인증 체계에 영향을 미칩니다. 브로커를 수동으로 추가하면 AWS 브로커가 IoT Greengrass에서 분리되고 브로커에서 지원되는 모든 인증 체계가 활성화됩니다. AWS 제공된 브로커 구성 요소에는 사전 정의된 인증 체계가 있습니다. 자세한 내용은 MQTT 3.1.1 브로커(Moquette)MQTT5 브로커(EMQX)를 참조하세요.

앱 개발자

MQTT 클라이언트를 업데이트합니다.

구성 요소가 브로커에 로컬로 연결되므로 샘플 코드는 인증을 사용하지 않습니다. 시나리오가 다른 경우 필요에 따라 MQTT 클라이언트 섹션을 업데이트합니다. 또한 다음 사항을 수행하세요.

  1. 구독의 MQTT 주제를 업데이트합니다.

  2. 각 소스의 MQTT 메시지가 다를 수 있으므로 필요에 따라 메시지 구문 분석기를 업데이트합니다.

앱 개발자
작업설명필요한 기술

코어 디바이스 배포를 업데이트합니다.

AWS IoT Greengrass 버전 2 코어 디바이스의 배포가 이미 있는 경우 배포를 수정합니다. 배포가 존재하지 않는 경우 새 배포를 생성하세요.

구성 요소에 올바른 이름을 지정하려면 다음을 기반으로 새 구성 요소(필요한 경우)에 대한 로그 관리자 구성을 업데이트하세요.

{ "logsUploaderConfiguration": { "systemLogsConfiguration": { ... }, "componentLogsConfigurationMap": { "<com.iot.ingest.parquet>": { "minimumLogLevel": "INFO", "diskSpaceLimit": "20", "diskSpaceLimitUnit": "MB", "deleteLogFileAfterCloudUpload": "false" } ... } }, "periodicUploadIntervalSec": "300" }

마지막으로 AWS IoT Greengrass 코어 디바이스의 배포 개정을 완료합니다.

앱 개발자
작업설명필요한 기술

로그에서 AWS IoT Greengrass 볼륨을 확인합니다.

다음 사항을 확인합니다.

  • MQTT 클라이언트가 로컬 MQTT브로커에 성공적으로 연결되었습니다.

  • MQTT 클라이언트가 올바른 주제를 구독합니다.

  • MQTT 주제에 대한 센서 업데이트 메시지가 브로커에 전송됩니다.

  • 파케이 압축은 매 주기마다 발생합니다.

앱 개발자

S3 버킷을 선택합니다.

데이터가 S3 버킷에 업로드되고 있는지 검증합니다. 매 기간마다 업로드되는 파일을 확인할 수 있습니다.

다음 섹션에서 데이터를 쿼리하여 데이터가 S3 버킷에 업로드되었는지 확인할 수도 있습니다.

앱 개발자
작업설명필요한 기술

데이터베이스 및 테이블을 생성합니다.

  1. AWS Glue 데이터베이스를 생성합니다(필요한 경우).

  2. Glue에서 테이블을 수동으로 생성하거나 AWS AWSGlue에서 어크롤러를 실행하여 생성합니다.

앱 개발자

Athena에게 데이터에 대한 액세스 권한을 부여합니다.

  1. Athena가 S3 버킷에 액세스할 수 있도록 권한을 업데이트합니다. 자세한 내용은 Athena 설명서의 AWS Glue 데이터 카탈로그에서 데이터베이스 및 테이블에 대한 세분화된 액세스를 참조하세요.

  2. 데이터베이스의 테이블을 쿼리합니다.

앱 개발자

문제 해결

문제Solution

MQTT 클라이언트 연결 실패

MQTT 클라이언트 구독 실패

MQTT 브로커에 대한 권한을 확인합니다. 의 MQTT브로커가 있는 경우 MQTT 3.1.1 브로커(모켓)MQTT5 브로커(EMQX)를 AWS참조하세요.

파케이 파일은 생성되지 않습니다

  • MQTT 주제가 올바른지 확인합니다.

  • 센서의 MQTT 메시지가 올바른 형식인지 확인합니다.

객체가 S3 버킷에 업로드되지 않았습니다.

  • 인터넷 연결 및 엔드포인트 연결이 있는지 검증합니다.

  • S3 버킷의 리소스 정책이 올바른지 검증합니다.

  • AWS IoT Greengrass 버전 2 코어 디바이스 역할에 대한 권한을 확인합니다.

관련 리소스

추가 정보

비용 분석

다음 비용 분석 시나리오는 이 패턴에서 다루는 데이터 수집 접근 방식이 AWS 클라우드의 데이터 수집 비용에 어떤 영향을 미칠 수 있는지 보여줍니다. 이 시나리오의 요금 예시는 발행 시점의 가격을 기준으로 합니다. 요금은 변경될 수 있습니다. 또한 비용은 AWS 리전, AWS 서비스 할당량 및 클라우드 환경과 관련된 기타 요인에 따라 달라질 수 있습니다.

입력 신호 세트

이 분석에서는 다음과 같은 입력 신호 세트를 기반으로 IoT 수집 비용을 사용 가능한 다른 대안과 비교합니다.

신호 수

Frequency(주파수)

신호당 데이터

125

25Hz

8 bytes

이 시나리오에서 시스템은 125개의 신호를 수신합니다. 각 신호는 8바이트이며 40밀리초(25Hz)마다 발생합니다. 이러한 신호는 개별적으로 제공되거나 공통 페이로드에 그룹화될 수 있습니다. 필요에 따라 이러한 신호를 분리하고 패킹할 수 있습니다. 지연 시간도 확인할 수 있습니다. 지연 시간은 데이터 수신, 누적 및 수집 기간으로 구성됩니다.

비교를 위해 이 시나리오의 수집 작업은 us-east-1 AWS 리전을 기반으로 합니다. 비용 비교는 AWS 서비스에만 적용됩니다. 하드웨어 또는 연결과 같은 기타 비용은 분석에 포함되지 않습니다.

비용 비교

다음 표는 각 수집 방법에 대한 월별 비용을 미국 달러(USD)로 보여줍니다.

방법

월별 비용

AWS IoT SiteWise *

331.77 USD

AWS 데이터 처리 팩이 포함된 IoT SiteWise Edge(모든 데이터를 엣지에 보관)

200 USD

AWS 원시 데이터에 액세스하기 위한 IoT Core 및 Amazon S3 규칙

84.54 USD

엣지에서의 파케이 파일 압축 및 Amazon S3에 업로드

0.5 USD

*Service Quotas를 준수하려면 데이터를 다운샘플링해야 합니다. 즉, 이 방법을 사용하면 데이터가 약간 손실될 수 있습니다.

대체 방법

이 섹션에서는 다음과 같은 대체 방법에 대한 해당 비용을 보여줍니다.

  • AWS IoT SiteWise - 각 신호는 개별 메시지에 업로드해야 합니다. 따라서 월별 총 메시지 수는 125×25×3600×24×30, 즉 월별 81억 개의 메시지입니다. 그러나 AWS IoT SiteWise 는 속성당 초당 10개의 데이터 포인트만 처리할 수 있습니다. 데이터를 10Hz로 다운샘플링한다고 가정하면 월별 메시지 수는 125×10×3600×24×30, 즉 32억 4천만 개로 줄어듭니다. 측정값을 10개 그룹(메시지 백만 개USD당 1개)으로 묶는 게시자 구성 요소를 사용하는 경우 월별 비용은 324USD입니다. 각 메시지가 8바이트(1Kb/125)라고 가정하면 25.92Gb의 데이터 스토리지가 됩니다. 이렇게 하면 월별 비용이 7.77USD이 추가됩니다. 첫 달의 총 비용은 331.77이며 USD 매월 7.77 USD 증가합니다.

  • 엣지AWS에서 완전히 처리된 모든 모델 및 신호를 포함한 데이터 처리 팩이 포함된 IoT SiteWise Edge(즉, 클라우드 수집 없음) - 비용을 절감하고 엣지에서 계산되는 모든 모델을 구성하는 대안으로 데이터 처리 팩을 사용할 수 있습니다. 이는 실제 계산이 수행되지 않더라도 스토리지 및 시각화에만 사용할 수 있습니다. 이 경우 엣지 게이트웨이에 강력한 하드웨어를 사용해야 합니다. 고정 비용은 USD 매월 200입니다.

  • 의해 AWS IoT Core로 직접 수집MQTT되고 Amazon S3에 원시 데이터를 저장하기 위한 IoT 규칙 - 모든 신호가 공통 페이로드에 게시된다고 가정할 때 AWSIoT Core에 게시된 총 메시지 수는 매월 25×3600×24×30 또는 6,480만 개입니다. 메시지 백만 개USD당 1개로 매월 64.8USD개 비용이 발생합니다. 규칙 활성화 백만 USD개당 0.15이고 메시지당 하나의 규칙이 있는 경우 월별 비용은 19.44USD입니다. Amazon S3의 스토리지 GbUSD당 0.023의 비용으로, 한 달USD에 1.5가 추가됩니다(새 데이터를 반영하도록 매월 증가). 첫 달의 총 비용은 84.54이며 USD 매월 1.5씩 USD 증가합니다.

  • Parquet 파일의 엣지에서 데이터를 압축하여 Amazon S3에 업로드(제안된 방법) - 압축률은 데이터 유형에 따라 다릅니다. 에 대해 테스트된 것과 동일한 산업 데이터를 사용하면 한 달 동안의 MQTT총 출력 데이터는 1.2Gb입니다. 이 요금은 USD 매월 0.03입니다. 다른 벤치마크에서 설명한 압축률(무작위 데이터 사용)은 66% 정도입니다(최악의 시나리오에 가까움). 총 데이터는 21Gb이며 비용은 USD 매월 0.5입니다.

파케이 파일 생성기

다음 코드 예제는 Python으로 작성된 Parquet 파일 생성기의 구조를 보여줍니다. 코드 예제는 설명을 위한 용도로만 사용되며 사용자 환경에 붙여넣으면 작동하지 않습니다.

import queue import paho.mqtt.client as mqtt import pandas as pd #queue for decoupling the MQTT thread messageQueue = queue.Queue() client = mqtt.Client() streammanager = StreamManagerClient() def feederListener(topic, message): payload = { "topic" : topic, "payload" : message, } messageQueue.put_nowait(payload) def on_connect(client_instance, userdata, flags, rc): client.subscribe("#",qos=0) def on_message(client, userdata, message): feederListener(topic=str(message.topic), message=str(message.payload.decode("utf-8"))) filename = "tempfile.parquet" streamname = "mystream" destination_bucket= "amzn-s3-demo-bucket" keyname="mykey" period= 60 client.on_connect = on_connect client.on_message = on_message streammanager.create_message_stream( MessageStreamDefinition(name=streamname, strategy_on_full=StrategyOnFull.OverwriteOldestData) ) while True: try: message = messageQueue.get(timeout=myArgs.mqtt_timeout) except (queue.Empty): logger.warning("MQTT message reception timed out") currentTimestamp = getCurrentTime() if currentTimestamp >= nextUploadTimestamp: df = pd.DataFrame.from_dict(accumulator) df.to_parquet(filename) s3_export_task_definition = S3ExportTaskDefinition(input_url=filename, bucket=destination_bucket, key=key_name) streammanager.append_message(streamname, Util.validate_and_serialize_to_json_bytes(s3_export_task_definition)) accumulator = {} nextUploadTimestamp += period else: accumulator.append(message)