MQTT 메시지 게시/구독 AWS IoT Core - AWS IoT Greengrass

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

MQTT 메시지 게시/구독 AWS IoT Core

AWS IoT Core MQTT 메시징 IPC 서비스를 사용하면 MQTT 메시지를 주고 받을 수 있습니다. AWS IoT Core구성 요소는 메시지를 AWS IoT Core 게시하고 주제에 구독하여 다른 소스의 MQTT 메시지를 처리할 수 있습니다. MQTT AWS IoT Core 구현에 대한 자세한 내용은 개발자 안내서의 MQTT를AWS IoT Core 참조하십시오.

참고

이 MQTT 메시징 IPC 서비스를 사용하면 메시지를 교환할 수 있습니다. AWS IoT Core구성 요소 간에 메시지를 교환하는 방법에 대한 자세한 내용은 을 참조하십시오. 로컬 메시지 게시/구독

최소 SDK 버전

다음 표에는 MQTT 메시지를 게시하고 MQTT 메시지를 구독하는 데 사용해야 하는 최소 버전이 나와 있습니다. AWS IoT Device SDK AWS IoT Core

권한 부여

사용자 지정 구성 요소에서 AWS IoT Core MQTT 메시징을 사용하려면 구성 요소가 주제에 대한 메시지를 보내고 받을 수 있도록 허용하는 권한 부여 정책을 정의해야 합니다. 권한 부여 정책 정의에 대한 자세한 내용은 을 참조하십시오구성 요소가 IPC 작업을 수행할 수 있도록 승인하십시오..

AWS IoT Core MQTT 메시징의 권한 부여 정책에는 다음과 같은 속성이 있습니다.

IPC 서비스 식별자: aws.greengrass.ipc.mqttproxy

Operation 설명 리소스

aws.greengrass#PublishToIoTCore

구성 요소가 지정한 MQTT 주제에 AWS IoT Core 대해 메시지를 게시할 수 있도록 합니다.

모든 주제에 대한 * 액세스를 허용하는 또는 와 test/topic 같은 주제 문자열. MQTT 주제 와일드카드 (#+) 를 사용하여 여러 리소스를 일치시킬 수 있습니다.

aws.greengrass#SubscribeToIoTCore

구성 요소가 지정한 주제의 메시지를 구독할 수 있도록 합니다. AWS IoT Core

주제 문자열 (예test/topic: 모든 주제에 대한 * 액세스를 허용하는 또는). MQTT 주제 와일드카드 (#+) 를 사용하여 여러 리소스를 일치시킬 수 있습니다.

*

구성 요소가 지정한 주제에 대한 AWS IoT Core MQTT 메시지를 게시하고 구독할 수 있도록 합니다.

모든 주제에 대한 * 액세스를 허용하는 또는 와 test/topic 같은 주제 문자열. MQTT 주제 와일드카드 (#+) 를 사용하여 여러 리소스를 일치시킬 수 있습니다.

MQTT 권한 부여 정책의 MQTT 와일드카드 AWS IoT Core

MQTT IPC 권한 부여 정책에서 AWS IoT Core MQTT 와일드카드를 사용할 수 있습니다. 구성 요소는 권한 부여 정책에서 허용하는 주제 필터와 일치하는 주제를 게시하고 구독할 수 있습니다. 예를 들어 구성 요소의 권한 부여 정책이 액세스 권한을 부여하는 test/topic/# 경우 구성 요소는 구독할 test/topic/# 수 있고 게시 및 구독할 수 있습니다test/topic/filter.

AWS IoT Core MQTT 권한 부여 정책의 레시피 변수

v2.6.0 이상의 Greengrass 핵을 사용하는 경우 권한 부여 정책에서 레시피 변수를 사용할 수 있습니다. {iot:thingName} 이 기능을 사용하면 코어 장치 그룹에 대해 단일 권한 부여 정책을 구성하여 각 코어 장치가 고유한 이름을 포함하는 항목에만 액세스할 수 있습니다. 예를 들어, 구성 요소가 다음 주제 리소스에 액세스하도록 허용할 수 있습니다.

devices/{iot:thingName}/messages

자세한 내용은 레시피 변수병합 업데이트에 레시피 변수를 사용하십시오. 섹션을 참조하세요.

권한 부여 정책 예제

다음 권한 부여 정책 예제를 참조하여 구성 요소에 대한 권한 부여 정책을 구성할 수 있습니다.

예 무제한 액세스가 포함된 권한 부여 정책 예시

다음 예제 권한 부여 정책은 구성 요소가 모든 주제를 게시하고 구독할 수 있도록 허용합니다.

JSON
{ "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.MyIoTCorePubSubComponent:mqttproxy:1": { "policyDescription": "Allows access to publish/subscribe to all topics.", "operations": [ "aws.greengrass#PublishToIoTCore", "aws.greengrass#SubscribeToIoTCore" ], "resources": [ "*" ] } } } }
YAML
--- accessControl: aws.greengrass.ipc.mqttproxy: com.example.MyIoTCorePubSubComponent:mqttproxy:1: policyDescription: Allows access to publish/subscribe to all topics. operations: - aws.greengrass#PublishToIoTCore - aws.greengrass#SubscribeToIoTCore resources: - "*"
예 액세스가 제한된 권한 부여 정책의 예

다음 예제 권한 부여 정책은 구성 요소가 factory/1/events 및 라는 두 개의 주제를 게시하고 구독할 수 있도록 허용합니다factory/1/actions.

JSON
{ "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.MyIoTCorePubSubComponent:mqttproxy:1": { "policyDescription": "Allows access to publish/subscribe to factory 1 topics.", "operations": [ "aws.greengrass#PublishToIoTCore", "aws.greengrass#SubscribeToIoTCore" ], "resources": [ "factory/1/actions", "factory/1/events" ] } } } }
YAML
--- accessControl: aws.greengrass.ipc.mqttproxy: "com.example.MyIoTCorePubSubComponent:mqttproxy:1": policyDescription: Allows access to publish/subscribe to factory 1 topics. operations: - aws.greengrass#PublishToIoTCore - aws.greengrass#SubscribeToIoTCore resources: - factory/1/actions - factory/1/events
예 핵심 장치 그룹에 대한 권한 부여 정책 예제
중요

이 예제에서는 v2.6.0 이상에서 사용할 수 있는 Greengrass 핵 구성 요소 기능을 사용합니다. Greengrass nucleus v2.6.0은 구성 요소 구성과 같은 대부분의 레시피 변수에 대한 지원을 추가합니다. {iot:thingName}

다음 예제 권한 부여 정책은 구성 요소가 구성 요소를 실행하는 핵심 장치의 이름이 포함된 주제를 게시하고 구독하도록 허용합니다.

JSON
{ "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.MyIoTCorePubSubComponent:mqttproxy:1": { "policyDescription": "Allows access to publish/subscribe to all topics.", "operations": [ "aws.greengrass#PublishToIoTCore", "aws.greengrass#SubscribeToIoTCore" ], "resources": [ "factory/1/devices/{iot:thingName}/controls" ] } } } }
YAML
--- accessControl: aws.greengrass.ipc.mqttproxy: "com.example.MyIoTCorePubSubComponent:mqttproxy:1": policyDescription: Allows access to publish/subscribe to all topics. operations: - aws.greengrass#PublishToIoTCore - aws.greengrass#SubscribeToIoTCore resources: - factory/1/devices/{iot:thingName}/controls

PublishToIoTCore

주제에 대한 MQTT 메시지를 게시합니다 AWS IoT Core .

MQTT 메시지를 에 게시하는 AWS IoT Core경우 초당 100개의 트랜잭션 할당량이 설정됩니다. 이 할당량을 초과하면 Greengrass 장치에서 메시지가 처리를 위해 대기열에 추가됩니다. 또한 초당 512Kb의 데이터 할당량과 계정 전체 할당량은 초당 20,000건의 게시 (일부 경우 2,000건) 입니다. AWS 리전 MQTT 메시지 브로커 한도에 대한 자세한 내용은 메시지 브로커 및 AWS IoT Core프로토콜 제한 및AWS IoT Core 할당량을 참조하십시오.

이 할당량을 초과할 경우 Greengrass 디바이스는 메시지 게시를 로 제한합니다. AWS IoT Core메시지는 스풀러의 메모리에 저장됩니다. 기본적으로 스풀러에 할당된 메모리는 2.5Mb입니다. 스풀러가 가득 차면 새 메시지가 거부됩니다. 스풀러의 크기를 늘릴 수 있습니다. 자세한 내용은 그린그래스 핵 설명서의 구성을(를) 참조하세요. 스풀러가 가득 차서 할당된 메모리를 늘릴 필요가 없도록 게시 요청을 초당 100개 이하로 제한하십시오.

애플리케이션에서 더 빠른 속도로 또는 더 큰 메시지를 전송해야 하는 경우 를 사용하여 Kinesis Data Streams로 메시지를 보내는 것을 고려해 보십시오. 스트림 관리자 스트림 관리자 구성 요소는 대용량 데이터를 로 전송하도록 설계되었습니다. AWS 클라우드자세한 정보는 Greengrass 코어 디바이스의 데이터 스트림 관리을 참조하세요.

요청

이 작업의 요청에는 다음과 같은 매개 변수가 있습니다.

topicName(Python:topic_name)

메시지를 게시할 주제.

qos

사용할 MQTT QoS입니다. 이 열거형의 값은 QOS 다음과 같습니다.

  • AT_MOST_ONCE— QoS 0. MQTT 메시지는 최대 한 번 전송됩니다.

  • AT_LEAST_ONCE— QoS 1. MQTT 메시지가 한 번 이상 전달되었습니다.

payload

(선택 사항) 블럽 형태의 메시지 페이로드.

MQTT 5를 사용하는 경우 v2.10.0 이상에서 다음 기능을 사용할 수 있습니다. 그린그래스 핵 MQTT 3.1.1을 사용할 때는 이러한 기능이 무시됩니다. 다음 표에는 이러한 기능에 액세스하는 데 사용해야 하는 AWS IoT 기기 SDK의 최소 버전이 나와 있습니다.

payloadFormat

(선택 사항) 메시지 페이로드의 형식. 를 설정하지 않으면 유형이 다음과 payloadFormat 같은 것으로 간주됩니다. BYTES 열거형의 값은 다음과 같습니다.

  • BYTES— 페이로드의 내용은 바이너리 블롭입니다.

  • UTF8— 페이로드의 내용은 UTF8 문자열입니다.

retain

(선택 사항) 게시할 때 MQTT 보존 옵션을 로 설정할지 여부를 나타냅니다. true

userProperties

(선택 사항) 전송할 애플리케이션별 UserProperty 객체 목록. UserProperty객체는 다음과 같이 정의됩니다.

UserProperty: key: string value: string
messageExpiryIntervalSeconds

(선택 사항) 메시지가 만료되어 서버에서 삭제되기까지의 시간 (초). 이 값을 설정하지 않으면 메시지가 만료되지 않습니다.

correlationData

(선택 사항) 요청을 응답과 연결하는 데 사용할 수 있는 요청에 추가된 정보입니다.

responseTopic

(선택 사항) 응답 메시지에 사용해야 하는 주제입니다.

contentType

(선택 사항) 메시지 콘텐츠 유형의 애플리케이션별 식별자.

응답

이 작업은 응답에 어떠한 정보도 제공하지 않습니다.

예제

다음 예제는 사용자 지정 구성 요소 코드에서 이 작업을 호출하는 방법을 보여줍니다.

Java (IPC client V2)
예: 메시지 게시
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.model.PublishToIoTCoreRequest; import software.amazon.awssdk.aws.greengrass.model.QOS; import java.nio.charset.StandardCharsets; public class PublishToIoTCore { public static void main(String[] args) { String topic = args[0]; String message = args[1]; QOS qos = QOS.get(args[2]); try (GreengrassCoreIPCClientV2 ipcClientV2 = GreengrassCoreIPCClientV2.builder().build()) { ipcClientV2.publishToIoTCore(new PublishToIoTCoreRequest() .withTopicName(topic) .withPayload(message.getBytes(StandardCharsets.UTF_8)) .withQos(qos)); System.out.println("Successfully published to topic: " + topic); } catch (Exception e) { System.err.println("Exception occurred."); e.printStackTrace(); System.exit(1); } } }
Python (IPC client V2)
예: 메시지 게시
참고

이 예제에서는 AWS IoT Device SDK Python v2용 버전 1.5.4 이상을 사용하고 있다고 가정합니다.

import awsiot.greengrasscoreipc.clientv2 as clientV2 topic = 'my/topic' qos = '1' payload = 'Hello, World' ipc_client = clientV2.GreengrassCoreIPCClientV2() resp = ipc_client.publish_to_iot_core(topic_name=topic, qos=qos, payload=payload) ipc_client.close()
Java (IPC client V1)
예: 메시지 게시
참고

이 예제에서는 IPCUtils 클래스를 사용하여 AWS IoT Greengrass Core IPC 서비스에 대한 연결을 생성합니다. 자세한 정보는 AWS IoT Greengrass 코어 IPC 서비스에 연결을 참조하세요.

package com.aws.greengrass.docs.samples.ipc; import com.aws.greengrass.docs.samples.ipc.util.IPCUtils; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.PublishToIoTCoreResponseHandler; import software.amazon.awssdk.aws.greengrass.model.PublishToIoTCoreRequest; import software.amazon.awssdk.aws.greengrass.model.PublishToIoTCoreResponse; import software.amazon.awssdk.aws.greengrass.model.QOS; import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class PublishToIoTCore { public static final int TIMEOUT_SECONDS = 10; public static void main(String[] args) { String topic = args[0]; String message = args[1]; QOS qos = QOS.get(args[2]); try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); PublishToIoTCoreResponseHandler responseHandler = PublishToIoTCore.publishBinaryMessageToTopic(ipcClient, topic, message, qos); CompletableFuture<PublishToIoTCoreResponse> futureResponse = responseHandler.getResponse(); try { futureResponse.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); System.out.println("Successfully published to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while publishing to topic: " + topic); } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { throw e; } } } catch (InterruptedException e) { System.out.println("IPC interrupted."); } catch (ExecutionException e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } public static PublishToIoTCoreResponseHandler publishBinaryMessageToTopic(GreengrassCoreIPCClient greengrassCoreIPCClient, String topic, String message, QOS qos) { PublishToIoTCoreRequest publishToIoTCoreRequest = new PublishToIoTCoreRequest(); publishToIoTCoreRequest.setTopicName(topic); publishToIoTCoreRequest.setPayload(message.getBytes(StandardCharsets.UTF_8)); publishToIoTCoreRequest.setQos(qos); return greengrassCoreIPCClient.publishToIoTCore(publishToIoTCoreRequest, Optional.empty()); } }
Python (IPC client V1)
예: 메시지 게시
참고

이 예제에서는 AWS IoT Device SDK Python v2용 버전 1.5.4 이상을 사용하고 있다고 가정합니다.

import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( QOS, PublishToIoTCoreRequest ) TIMEOUT = 10 ipc_client = awsiot.greengrasscoreipc.connect() topic = "my/topic" message = "Hello, World" qos = QOS.AT_LEAST_ONCE request = PublishToIoTCoreRequest() request.topic_name = topic request.payload = bytes(message, "utf-8") request.qos = qos operation = ipc_client.new_publish_to_iot_core() operation.activate(request) future_response = operation.get_response() future_response.result(TIMEOUT)
C++
예: 메시지 게시
#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String message("Hello, World!"); String topic("my/topic"); QOS qos = QOS_AT_MOST_ONCE; int timeout = 10; PublishToIoTCoreRequest request; Vector<uint8_t> messageData({message.begin(), message.end()}); request.SetTopicName(topic); request.SetPayload(messageData); request.SetQos(qos); auto operation = ipcClient.NewPublishToIoTCore(); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } } return 0; }
JavaScript
예: 메시지 게시
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc"; import {QOS, PublishToIoTCoreRequest} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model"; class PublishToIoTCore { private ipcClient: greengrasscoreipc.Client private readonly topic: string; constructor() { // define your own constructor, e.g. this.topic = "<define_your_topic>"; this.publishToIoTCore().then(r => console.log("Started workflow")); } private async publishToIoTCore() { try { const request: PublishToIoTCoreRequest = { topicName: this.topic, qos: QOS.AT_LEAST_ONCE, // you can change this depending on your use case } this.ipcClient = await getIpcClient(); await this.ipcClient.publishToIoTCore(request); } catch (e) { // parse the error depending on your use cases throw e } } } export async function getIpcClient(){ try { const ipcClient = greengrasscoreipc.createClient(); await ipcClient.connect() .catch(error => { // parse the error depending on your use cases throw error; }); return ipcClient } catch (err) { // parse the error depending on your use cases throw err } } // starting point const publishToIoTCore = new PublishToIoTCore();

SubscribeToIoTCore

주제 또는 주제 AWS IoT Core 필터에서 MQTT 메시지를 구독하십시오. AWS IoT Greengrass Core 소프트웨어는 구성 요소의 수명 주기가 끝나면 구독을 제거합니다.

이 작업은 이벤트 메시지 스트림을 구독하는 구독 작업입니다. 이 작업을 사용하려면 이벤트 메시지, 오류 및 스트림 폐쇄를 처리하는 함수가 포함된 스트림 응답 핸들러를 정의하십시오. 자세한 정보는 IPC 이벤트 스트림을 구독하세요.을 참조하세요.

이벤트 메시지 유형: IoTCoreMessage

요청

이 작업의 요청에는 다음과 같은 매개변수가 있습니다.

topicName(Python:topic_name)

구독하고 싶은 주제. MQTT 주제 와일드카드 (#+) 를 사용하여 여러 주제를 구독할 수 있습니다.

qos

사용할 MQTT QoS입니다. 이 열거형의 값은 QOS 다음과 같습니다.

  • AT_MOST_ONCE— QoS 0. MQTT 메시지는 최대 한 번 전송됩니다.

  • AT_LEAST_ONCE— QoS 1. MQTT 메시지가 한 번 이상 전달되었습니다.

응답

이 작업의 응답에는 다음 정보가 포함됩니다.

messages

MQTT 메시지 스트림. 이 객체에는 다음 정보가 들어 IoTCoreMessage 있습니다.

message

MQTT 메시지입니다. 이 개체MQTTMessage, 에는 다음 정보가 들어 있습니다.

topicName(Python:topic_name)

메시지가 게시된 주제.

payload

(선택 사항) 블럽 형태의 메시지 페이로드입니다.

MQTT 5를 사용하는 경우 v2.10.0 이상에서 다음 기능을 사용할 수 있습니다. 그린그래스 핵 MQTT 3.1.1을 사용할 때는 이러한 기능이 무시됩니다. 다음 표에는 이러한 기능에 액세스하는 데 사용해야 하는 AWS IoT 기기 SDK의 최소 버전이 나와 있습니다.

payloadFormat

(선택 사항) 메시지 페이로드의 형식. 를 설정하지 않으면 유형이 다음과 payloadFormat 같은 것으로 간주됩니다. BYTES 열거형의 값은 다음과 같습니다.

  • BYTES— 페이로드의 내용은 바이너리 블롭입니다.

  • UTF8— 페이로드의 내용은 UTF8 문자열입니다.

retain

(선택 사항) 게시할 때 MQTT 보존 옵션을 로 설정할지 여부를 나타냅니다. true

userProperties

(선택 사항) 전송할 애플리케이션별 UserProperty 객체 목록. UserProperty객체는 다음과 같이 정의됩니다.

UserProperty: key: string value: string
messageExpiryIntervalSeconds

(선택 사항) 메시지가 만료되어 서버에서 삭제되기까지의 시간 (초). 이 값을 설정하지 않으면 메시지가 만료되지 않습니다.

correlationData

(선택 사항) 요청을 응답과 연결하는 데 사용할 수 있는 요청에 추가된 정보입니다.

responseTopic

(선택 사항) 응답 메시지에 사용해야 하는 주제입니다.

contentType

(선택 사항) 메시지 콘텐츠 유형의 애플리케이션별 식별자입니다.

예제

다음 예제는 사용자 지정 구성 요소 코드에서 이 작업을 호출하는 방법을 보여줍니다.

Java (IPC client V2)
예: 메시지 구독
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.SubscribeToIoTCoreResponseHandler; import software.amazon.awssdk.aws.greengrass.model.QOS; import software.amazon.awssdk.aws.greengrass.model.IoTCoreMessage; import software.amazon.awssdk.aws.greengrass.model.SubscribeToIoTCoreRequest; import software.amazon.awssdk.aws.greengrass.model.SubscribeToIoTCoreResponse; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.function.Consumer; import java.util.function.Function; public class SubscribeToIoTCore { public static void main(String[] args) { String topic = args[0]; QOS qos = QOS.get(args[1]); Consumer<IoTCoreMessage> onStreamEvent = ioTCoreMessage -> System.out.printf("Received new message on topic %s: %s%n", ioTCoreMessage.getMessage().getTopicName(), new String(ioTCoreMessage.getMessage().getPayload(), StandardCharsets.UTF_8)); Optional<Function<Throwable, Boolean>> onStreamError = Optional.of(e -> { System.err.println("Received a stream error."); e.printStackTrace(); return false; }); Optional<Runnable> onStreamClosed = Optional.of(() -> System.out.println("Subscribe to IoT Core stream closed.")); try (GreengrassCoreIPCClientV2 ipcClientV2 = GreengrassCoreIPCClientV2.builder().build()) { SubscribeToIoTCoreRequest request = new SubscribeToIoTCoreRequest() .withTopicName(topic) .withQos(qos); GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToIoTCoreResponse, SubscribeToIoTCoreResponseHandler> streamingResponse = ipcClientV2.subscribeToIoTCore(request, onStreamEvent, onStreamError, onStreamClosed); streamingResponse.getResponse(); System.out.println("Successfully subscribed to topic: " + topic); // Keep the main thread alive, or the process will exit. while (true) { Thread.sleep(10000); } // To stop subscribing, close the stream. streamingResponse.getHandler().closeStream(); } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } catch (Exception e) { System.err.println("Exception occurred."); e.printStackTrace(); System.exit(1); } } }
Python (IPC client V2)
예: 메시지 구독
참고

이 예제에서는 AWS IoT Device SDK Python v2용 버전 1.5.4 이상을 사용하고 있다고 가정합니다.

import threading import traceback import awsiot.greengrasscoreipc.clientv2 as clientV2 topic = 'my/topic' qos = '1' def on_stream_event(event): try: topic_name = event.message.topic_name message = str(event.message.payload, 'utf-8') print(f'Received new message on topic {topic_name}: {message}') except: traceback.print_exc() def on_stream_error(error): # Return True to close stream, False to keep stream open. return True def on_stream_closed(): pass ipc_client = clientV2.GreengrassCoreIPCClientV2() resp, operation = ipc_client.subscribe_to_iot_core( topic_name=topic, qos=qos, on_stream_event=on_stream_event, on_stream_error=on_stream_error, on_stream_closed=on_stream_closed ) # Keep the main thread alive, or the process will exit. event = threading.Event() event.wait() # To stop subscribing, close the operation stream. operation.close() ipc_client.close()
Java (IPC client V1)
예: 메시지 구독
참고

이 예제에서는 IPCUtils 클래스를 사용하여 AWS IoT Greengrass Core IPC 서비스에 대한 연결을 생성합니다. 자세한 정보는 AWS IoT Greengrass 코어 IPC 서비스에 연결을 참조하세요.

package com.aws.greengrass.docs.samples.ipc; import com.aws.greengrass.docs.samples.ipc.util.IPCUtils; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.SubscribeToIoTCoreResponseHandler; import software.amazon.awssdk.aws.greengrass.model.*; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import software.amazon.awssdk.eventstreamrpc.StreamResponseHandler; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class SubscribeToIoTCore { public static final int TIMEOUT_SECONDS = 10; public static void main(String[] args) { String topic = args[0]; QOS qos = QOS.get(args[1]); try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); StreamResponseHandler<IoTCoreMessage> streamResponseHandler = new SubscriptionResponseHandler(); SubscribeToIoTCoreResponseHandler responseHandler = SubscribeToIoTCore.subscribeToIoTCore(ipcClient, topic, qos, streamResponseHandler); CompletableFuture<SubscribeToIoTCoreResponse> futureResponse = responseHandler.getResponse(); try { futureResponse.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); System.out.println("Successfully subscribed to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while subscribing to topic: " + topic); } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while subscribing to topic: " + topic); } else { throw e; } } // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } // To stop subscribing, close the stream. responseHandler.closeStream(); } catch (InterruptedException e) { System.out.println("IPC interrupted."); } catch (ExecutionException e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } public static SubscribeToIoTCoreResponseHandler subscribeToIoTCore(GreengrassCoreIPCClient greengrassCoreIPCClient, String topic, QOS qos, StreamResponseHandler<IoTCoreMessage> streamResponseHandler) { SubscribeToIoTCoreRequest subscribeToIoTCoreRequest = new SubscribeToIoTCoreRequest(); subscribeToIoTCoreRequest.setTopicName(topic); subscribeToIoTCoreRequest.setQos(qos); return greengrassCoreIPCClient.subscribeToIoTCore(subscribeToIoTCoreRequest, Optional.of(streamResponseHandler)); } public static class SubscriptionResponseHandler implements StreamResponseHandler<IoTCoreMessage> { @Override public void onStreamEvent(IoTCoreMessage ioTCoreMessage) { try { String topic = ioTCoreMessage.getMessage().getTopicName(); String message = new String(ioTCoreMessage.getMessage().getPayload(), StandardCharsets.UTF_8); System.out.printf("Received new message on topic %s: %s%n", topic, message); } catch (Exception e) { System.err.println("Exception occurred while processing subscription response " + "message."); e.printStackTrace(); } } @Override public boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; } @Override public void onStreamClosed() { System.out.println("Subscribe to IoT Core stream closed."); } } }
Python (IPC client V1)
예: 메시지 구독
참고

이 예제에서는 AWS IoT Device SDK Python v2용 버전 1.5.4 이상을 사용하고 있다고 가정합니다.

import time import traceback import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( IoTCoreMessage, QOS, SubscribeToIoTCoreRequest ) TIMEOUT = 10 ipc_client = awsiot.greengrasscoreipc.connect() class StreamHandler(client.SubscribeToIoTCoreStreamHandler): def __init__(self): super().__init__() def on_stream_event(self, event: IoTCoreMessage) -> None: try: message = str(event.message.payload, "utf-8") topic_name = event.message.topic_name # Handle message. except: traceback.print_exc() def on_stream_error(self, error: Exception) -> bool: # Handle error. return True # Return True to close stream, False to keep stream open. def on_stream_closed(self) -> None: # Handle close. pass topic = "my/topic" qos = QOS.AT_MOST_ONCE request = SubscribeToIoTCoreRequest() request.topic_name = topic request.qos = qos handler = StreamHandler() operation = ipc_client.new_subscribe_to_iot_core(handler) operation.activate(request) future_response = operation.get_response() future_response.result(TIMEOUT) # Keep the main thread alive, or the process will exit. while True: time.sleep(10) # To stop subscribing, close the operation stream. operation.close()
C++
예: 메시지 구독
#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IoTCoreResponseHandler : public SubscribeToIoTCoreStreamHandler { public: virtual ~IoTCoreResponseHandler() {} private: void OnStreamEvent(IoTCoreMessage *response) override { auto message = response->GetMessage(); if (message.has_value() && message.value().GetPayload().has_value()) { auto messageBytes = message.value().GetPayload().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); std::string topicName = message.value().GetTopicName().value().c_str(); // Handle message. } } bool OnStreamError(OperationError *error) override { // Handle error. return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { // Handle close. } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String topic("my/topic"); QOS qos = QOS_AT_MOST_ONCE; int timeout = 10; SubscribeToIoTCoreRequest request; request.SetTopicName(topic); request.SetQos(qos); auto streamHandler = MakeShared<IoTCoreResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToIoTCore(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }
JavaScript
예: 메시지 구독
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc"; import {IoTCoreMessage, QOS, SubscribeToIoTCoreRequest} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model"; import {RpcError} from "aws-iot-device-sdk-v2/dist/eventstream_rpc"; class SubscribeToIoTCore { private ipcClient: greengrasscoreipc.Client private readonly topic: string; constructor() { // define your own constructor, e.g. this.topic = "<define_your_topic>"; this.subscribeToIoTCore().then(r => console.log("Started workflow")); } private async subscribeToIoTCore() { try { const request: SubscribeToIoTCoreRequest = { topicName: this.topic, qos: QOS.AT_LEAST_ONCE, // you can change this depending on your use case } this.ipcClient = await getIpcClient(); const streamingOperation = this.ipcClient.subscribeToIoTCore(request); streamingOperation.on('message', (message: IoTCoreMessage) => { // parse the message depending on your use cases, e.g. if (message.message && message.message.payload) { const receivedMessage = message.message.payload.toString(); } }); streamingOperation.on('streamError', (error : RpcError) => { // define your own error handling logic }); streamingOperation.on('ended', () => { // define your own logic }); await streamingOperation.activate(); // Keep the main thread alive, or the process will exit. await new Promise((resolve) => setTimeout(resolve, 10000)) } catch (e) { // parse the error depending on your use cases throw e } } } export async function getIpcClient(){ try { const ipcClient = greengrasscoreipc.createClient(); await ipcClient.connect() .catch(error => { // parse the error depending on your use cases throw error; }); return ipcClient } catch (err) { // parse the error depending on your use cases throw err } } // starting point const subscribeToIoTCore = new SubscribeToIoTCore();

예제

다음 예제를 사용하여 구성 요소에서 AWS IoT Core MQTT IPC 서비스를 사용하는 방법을 알아보십시오.

다음 예제 레시피를 사용하면 구성 요소를 모든 주제에 게시할 수 있습니다.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.IoTCorePublisherCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes MQTT messages to IoT Core.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.IoTCorePublisherCpp:mqttproxy:1": { "policyDescription": "Allows access to publish to all topics.", "operations": [ "aws.greengrass#PublishToIoTCore" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "run": "{artifacts:path}/greengrassv2_iotcore_publisher" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.IoTCorePublisherCpp/1.0.0/greengrassv2_iotcore_publisher", "Permission": { "Execute": "OWNER" } } ] } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.IoTCorePublisherCpp ComponentVersion: 1.0.0 ComponentDescription: A component that publishes MQTT messages to IoT Core. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.mqttproxy: com.example.IoTCorePublisherCpp:mqttproxy:1: policyDescription: Allows access to publish to all topics. operations: - aws.greengrass#PublishToIoTCore resources: - "*" Manifests: - Lifecycle: run: "{artifacts:path}/greengrassv2_iotcore_publisher" Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.IoTCorePublisherCpp/1.0.0/greengrassv2_iotcore_publisher Permission: Execute: OWNER

다음 예제 C++ 애플리케이션은 AWS IoT Core MQTT IPC 서비스를 사용하여 메시지를 게시하는 방법을 보여줍니다. AWS IoT Core

#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String message("Hello from the Greengrass IPC MQTT publisher (C++)."); String topic("test/topic/cpp"); QOS qos = QOS_AT_LEAST_ONCE; int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } while (true) { PublishToIoTCoreRequest request; Vector<uint8_t> messageData({message.begin(), message.end()}); request.SetTopicName(topic); request.SetPayload(messageData); request.SetQos(qos); auto operation = ipcClient.NewPublishToIoTCore(); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } std::this_thread::sleep_for(std::chrono::seconds(5)); } return 0; }

다음 예제 레시피를 사용하면 구성 요소가 모든 주제를 구독할 수 있습니다.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.IoTCoreSubscriberCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to MQTT messages from IoT Core.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.IoTCoreSubscriberCpp:mqttproxy:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": [ "aws.greengrass#SubscribeToIoTCore" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "run": "{artifacts:path}/greengrassv2_iotcore_subscriber" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.IoTCoreSubscriberCpp/1.0.0/greengrassv2_iotcore_subscriber", "Permission": { "Execute": "OWNER" } } ] } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.IoTCoreSubscriberCpp ComponentVersion: 1.0.0 ComponentDescription: A component that subscribes to MQTT messages from IoT Core. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.mqttproxy: com.example.IoTCoreSubscriberCpp:mqttproxy:1: policyDescription: Allows access to subscribe to all topics. operations: - aws.greengrass#SubscribeToIoTCore resources: - "*" Manifests: - Lifecycle: run: "{artifacts:path}/greengrassv2_iotcore_subscriber" Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.IoTCoreSubscriberCpp/1.0.0/greengrassv2_iotcore_subscriber Permission: Execute: OWNER

다음 예제 C++ 애플리케이션은 AWS IoT Core MQTT IPC 서비스를 사용하여 메시지를 구독하는 방법을 보여줍니다. AWS IoT Core

#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IoTCoreResponseHandler : public SubscribeToIoTCoreStreamHandler { public: virtual ~IoTCoreResponseHandler() {} private: void OnStreamEvent(IoTCoreMessage *response) override { auto message = response->GetMessage(); if (message.has_value() && message.value().GetPayload().has_value()) { auto messageBytes = message.value().GetPayload().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); std::string messageTopic = message.value().GetTopicName().value().c_str(); std::cout << "Received new message on topic: " << messageTopic << std::endl; std::cout << "Message: " << messageString << std::endl; } } bool OnStreamError(OperationError *error) override { std::cout << "Received an operation error: "; if (error->GetMessage().has_value()) { std::cout << error->GetMessage().value(); } std::cout << std::endl; return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { std::cout << "Subscribe to IoT Core stream closed." << std::endl; } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String topic("test/topic/cpp"); QOS qos = QOS_AT_LEAST_ONCE; int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } SubscribeToIoTCoreRequest request; request.SetTopicName(topic); request.SetQos(qos); auto streamHandler = MakeShared<IoTCoreResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToIoTCore(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully subscribed to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to subscribe to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }