기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
AWS IoT Device SDK 를 사용하여 Greengrass 핵, 기타 구성 요소와 통신하고 AWS IoT Core
코어 기기에서 실행되는 구성 요소는 의 AWS IoT Greengrass Core 프로세스 간 통신 (IPC) 라이브러리를 사용하여 AWS IoT Greengrass 핵 및 기타 Greengrass 구성 요소와 AWS IoT Device SDK 통신할 수 있습니다. 를 사용하는 IPC 사용자 지정 구성 요소를 개발하고 AWS IoT Device SDK 실행하려면 를 사용하여 AWS IoT Greengrass Core IPC 서비스에 연결하고 작업을 수행해야 합니다. IPC
IPC인터페이스는 두 가지 유형의 작업을 지원합니다.
-
요청/응답
구성 요소는 IPC 서비스에 요청을 보내고 요청 결과가 포함된 응답을 받습니다.
-
Subscription
구성 요소는 IPC 서비스에 구독 요청을 보내고 이에 대한 응답으로 이벤트 메시지 스트림을 예상합니다. 구성 요소는 이벤트 메시지, 오류 및 스트림 폐쇄를 처리하는 구독 핸들러를 제공합니다. AWS IoT Device SDK 여기에는 각 IPC 작업에 대한 올바른 응답 및 이벤트 유형이 포함된 핸들러 인터페이스가 포함되어 있습니다. 자세한 내용은 IPC이벤트 스트림 구독 단원을 참조하십시오.
IPC클라이언트 버전
이후 버전의 Java 및 SDKs Python에서는 IPC 클라이언트 V2라는 향상된 버전의 IPC 클라이언트를 AWS IoT Greengrass 제공합니다. IPC클라이언트 V2:
-
IPC작업을 사용하기 위해 작성해야 하는 코드의 양을 줄이고 IPC 클라이언트 V1에서 발생할 수 있는 일반적인 오류를 방지하는 데 도움이 됩니다.
-
구독 핸들러 콜백을 별도의 스레드에서 호출하므로 이제 구독 핸들러 콜백에서 추가 IPC 함수 호출을 비롯한 차단 코드를 실행할 수 있습니다. IPC클라이언트 V1은 동일한 스레드를 사용하여 IPC 서버와 통신하고 구독 핸들러 콜백을 호출합니다.
-
Lambda 표현식 (Java) 또는 함수 (Python) 를 사용하여 구독 작업을 호출할 수 있습니다. IPC클라이언트 V1을 사용하려면 구독 핸들러 클래스를 정의해야 합니다.
-
각 작업의 동기 및 비동기 버전을 제공합니다. IPC IPC클라이언트 V1은 각 작업의 비동기 버전만 제공합니다.
이러한 개선 사항을 활용하려면 IPC 클라이언트 V2를 사용하는 것이 좋습니다. 그러나 이 설명서와 일부 온라인 콘텐츠의 많은 예제는 IPC 클라이언트 V1을 사용하는 방법만 보여줍니다. 다음 예제와 자습서를 사용하여 IPC 클라이언트 V2를 사용하는 샘플 구성 요소를 볼 수 있습니다.
현재 C++ AWS IoT Device SDK v2용 은 IPC 클라이언트 V1만 지원합니다.
프로세스 SDKs 간 통신이 지원됩니다.
AWS IoT Greengrass Core IPC 라이브러리는 다음 AWS IoT Device SDK 버전에 포함되어 있습니다.
AWS IoT Greengrass 코어 IPC 서비스에 연결
사용자 지정 구성 요소에서 프로세스 간 통신을 사용하려면 AWS IoT Greengrass Core 소프트웨어가 실행하는 IPC 서버 소켓에 대한 연결을 만들어야 합니다. 다음 작업을 완료하여 원하는 언어로 다운로드하고 사용하십시오. AWS IoT Device SDK
자바 v2 (IPC클라이언트 V2) AWS IoT Device SDK 에 사용하려면
-
자바AWS IoT Device SDK v2용 (v1.6.0 이상) 을 다운로드하십시오.
-
구성 요소에서 사용자 지정 코드를 실행하려면 다음 중 하나를 수행하십시오.
-
다음 코드를 사용하여 클라이언트를 생성합니다. IPC
try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) {
// Use client.
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Exception occurred when using IPC.", e);
System.exit(1);
}
Python v2 (IPC클라이언트 V2) AWS IoT Device SDK 에 사용하려면
-
AWS IoT Device SDK Python용 다운로드 (v1.9.0 이상)
-
구성 요소 SDK 레시피의 설치 라이프사이클에 의 설치 단계를 추가합니다.
-
AWS IoT Greengrass 코어 IPC 서비스에 대한 연결을 생성합니다. 다음 코드를 사용하여 IPC 클라이언트를 생성합니다.
from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2
try:
ipc_client = GreengrassCoreIPCClientV2()
# Use IPC client.
except Exception:
print('Exception occurred when using IPC.', file=sys.stderr)
traceback.print_exc()
exit(1)
C++용 AWS IoT Device SDK v2를 빌드하려면 기기에 다음 도구가 있어야 합니다.
-
C++ 11 이상
-
CMake3.1 이상
-
다음 컴파일러 중 하나:
-
GCC4.8 이상
-
클랭 3.9 이상
-
MSVC2015년 또는 그 이후
C++ AWS IoT Device SDK v2용으로 사용하려면
-
C++AWS IoT Device SDK v2용 버전 (v1.17.0 이상) 을 다운로드하십시오.
-
의 설치 지침에 따라 소스에서 C++ README AWS IoT Device SDK v2용 버전을 빌드하십시오.
-
C++ 빌드 도구에서 이전 단계에서 빌드한 IPC Greengrass 라이브러리를 AWS::GreengrassIpc-cpp
연결합니다. 다음 CMakeLists.txt
예제는 Greengrass IPC 라이브러리를 빌드하는 데 사용하는 프로젝트에 연결합니다. CMake
cmake_minimum_required(VERSION 3.1)
project (greengrassv2_pubsub_subscriber)
file(GLOB MAIN_SRC
"*.h"
"*.cpp"
)
add_executable(${PROJECT_NAME} ${MAIN_SRC})
set_target_properties(${PROJECT_NAME} PROPERTIES
LINKER_LANGUAGE CXX
CXX_STANDARD 11)
find_package(aws-crt-cpp PATHS ~/sdk-cpp-workspace/build)
find_package(EventstreamRpc-cpp PATHS ~/sdk-cpp-workspace/build)
find_package(GreengrassIpc-cpp PATHS ~/sdk-cpp-workspace/build)
target_link_libraries(${PROJECT_NAME} AWS::GreengrassIpc-cpp)
-
구성 요소 코드에서 AWS IoT Greengrass Core IPC 서비스에 대한 연결을 생성하여 IPC 클라이언트 (Aws::Greengrass::GreengrassCoreIpcClient
) 를 생성합니다. 연결, IPC 연결 끊기 및 오류 이벤트를 처리하는 IPC 연결 수명 주기 핸들러를 정의해야 합니다. 다음 예제에서는 IPC 클라이언트가 연결되고 IPC 연결이 끊기고 오류가 발생할 때 인쇄하는 IPC 클라이언트와 연결 수명 주기 핸들러를 만듭니다.
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
std::cout << "OnConnectCallback" << std::endl;
}
void OnDisconnectCallback(RpcError error) override {
std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl;
exit(-1);
}
bool OnErrorCallback(RpcError error) override {
std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl;
return true;
}
};
int main() {
// Create the IPC client.
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
// Use the IPC client to create an operation request.
// Activate the operation request.
auto activate = operation.Activate(request, nullptr);
activate.wait();
// Wait for Greengrass Core to respond to the request.
auto responseFuture = operation.GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
// Check the result of the request.
auto response = responseFuture.get();
if (response) {
std::cout << "Successfully published to topic: " << topic << std::endl;
} else {
// An error occurred.
std::cout << "Failed to publish to topic: " << topic << std::endl;
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
std::cout << "Operation error: " << error->GetMessage().value() << std::endl;
} else {
std::cout << "RPC error: " << response.GetRpcError() << std::endl;
}
exit(-1);
}
return 0;
}
-
구성 요소에서 사용자 지정 코드를 실행하려면 코드를 이진 아티팩트로 빌드하고 구성 요소 레시피에서 이진 아티팩트를 실행하십시오. AWS IoT Greengrass Core 소프트웨어에서 바이너리 아티팩트를 실행할 수 OWNER
있도록 아티팩트의 Execute
권한을 설정합니다.
구성 요소 레시피의 Manifests
섹션은 다음 예제와 비슷할 수 있습니다.
- JSON
-
{
...
"Manifests": [
{
"Lifecycle": {
"run": "{artifacts:path}/greengrassv2_pubsub_subscriber"
},
"Artifacts": [
{
"URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber",
"Permission": {
"Execute": "OWNER"
}
}
]
}
]
}
- YAML
-
...
Manifests:
- Lifecycle:
run: {artifacts:path}/greengrassv2_pubsub_subscriber
Artifacts:
- URI: s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber
Permission:
Execute: OWNER
NodeJS와 함께 사용할 AWS IoT Device SDK JavaScript v2용으로 빌드하려면 기기에 다음 도구가 있어야 합니다.
-
NodeJS 10.0 이상
-
CMake3.1 이상
AWS IoT Device SDK JavaScript v2용 (IPC클라이언트 V1) 을 사용하려면
-
AWS IoT Device SDK
JavaScript v2용 버전 (v1.12.10 이상) 을 다운로드하십시오.
-
의 설치 지침에 따라 소스에서 AWS IoT Device SDK JavaScript v2용 버전을 빌드하십시오. README
-
AWS IoT Greengrass 코어 IPC 서비스에 대한 연결을 생성합니다. 다음 단계를 완료하여 IPC 클라이언트를 만들고 연결을 설정합니다.
-
다음 코드를 사용하여 IPC 클라이언트를 생성합니다.
import * as greengrascoreipc from 'aws-iot-device-sdk-v2';
let client = greengrascoreipc.createClient();
-
다음 코드를 사용하여 구성 요소에서 Greengrass 핵으로의 연결을 설정합니다.
await client.connect();
구성 요소가 작업을 수행할 수 있도록 승인하십시오. IPC
사용자 지정 구성 요소가 일부 IPC 작업을 사용할 수 있도록 하려면 구성 요소가 특정 리소스에서 작업을 수행하도록 허용하는 권한 부여 정책을 정의해야 합니다. 각 권한 부여 정책은 정책에서 허용하는 작업 목록과 리소스 목록을 정의합니다. 예를 들어 게시/구독 메시징 IPC 서비스는 주제 리소스에 대한 게시 및 구독 작업을 정의합니다. *
와일드카드를 사용하여 모든 작업 또는 모든 리소스에 대한 액세스를 허용할 수 있습니다.
구성 매개 변수를 사용하여 권한 부여 정책을 정의합니다. accessControl
구성 매개 변수는 구성 요소 레시피에서 설정하거나 구성 요소를 배포할 때 설정할 수 있습니다. accessControl
객체는 IPC 서비스 식별자를 권한 부여 정책 목록에 매핑합니다. 각 IPC 서비스에 대해 여러 권한 부여 정책을 정의하여 액세스를 제어할 수 있습니다. 각 권한 부여 정책에는 모든 구성 요소 간에 고유해야 하는 정책 ID가 있습니다.
고유한 정책을 만들려면 구성 요소 이름IDs, IPC 서비스 이름 및 카운터를 조합하면 됩니다. 예를 com.example.HelloWorld
들어 이름이 지정된 구성 요소는 다음과 같은 두 개의 게시/구독 권한 부여 정책을 정의할 수 있습니다. IDs
권한 부여 정책은 다음 형식을 사용합니다. 이 개체는 accessControl
구성 매개변수입니다.
- JSON
-
{
"IPC service identifier
": {
"policyId
": {
"policyDescription": "description
",
"operations": [
"operation1
",
"operation2
"
],
"resources": [
"resource1
",
"resource2
"
]
}
}
}
- YAML
-
IPC service identifier
:
policyId
:
policyDescription: description
operations:
- operation1
- operation2
resources:
- resource1
- resource2
권한 부여 정책의 와일드카드
권한 부여 정책 resources
요소에 *
와일드카드를 사용하여 단일 IPC 권한 부여 정책에서 여러 리소스에 대한 액세스를 허용할 수 있습니다.
-
Greengrass Nucleus의 모든 버전에서는 단일 *
문자를 리소스로 지정하여 모든 리소스에 대한 액세스를 허용할 수 있습니다.
-
Greengrass nucleus v2.6.0 이상에서는 리소스의 문자를 모든 *
문자 조합과 일치하도록 지정할 수 있습니다. 예를 들어 각 장치 이름이 로 시작되는 공장 내 모든 온도 조절기 장치의 상태 항목에 대한 액세스를 factory/1/devices/Thermostat*/status
허용하도록 지정할 수 있습니다. Thermostat
AWS IoT Core MQTTIPC서비스에 대한 권한 부여 정책을 정의할 때 MQTT 와일드카드 (+
및#
) 를 사용하여 여러 리소스를 매칭할 수도 있습니다. 자세한 내용은 AWS IoT Core MQTTIPC권한 부여 정책의 MQTT 와일드카드를 참조하십시오.
권한 부여 정책의 레시피 변수
Greengrass nucleus v2.6.0 이상을 사용하고 Greengrass nucleus의 interpolateComponentConfiguration구성 옵션을 로 설정하면 권한 부여 정책에서 레시피 변수를 사용할 수 true 있습니다. {iot:thingName} MQTT주제나 디바이스 섀도와 같이 코어 디바이스 이름을 포함하는 권한 부여 정책이 필요한 경우 이 레시피 변수를 사용하여 코어 디바이스 그룹에 대한 단일 권한 부여 정책을 구성할 수 있습니다. 예를 들어 섀도우 IPC 작업을 위해 구성 요소가 다음 리소스에 액세스하도록 허용할 수 있습니다.
$aws/things/{iot:thingName}/shadow/
권한 부여 정책의 특수 문자
권한 부여 정책에서 리터럴 *
또는 ?
문자를 지정하려면 이스케이프 시퀀스를 사용해야 합니다. 다음 이스케이프 시퀀스는 AWS IoT Greengrass Core 소프트웨어가 문자의 특수 의미 대신 리터럴 값을 사용하도록 지시합니다. 예를 들어, 문자는 모든 *
문자 조합과 일치하는 와일드카드입니다.
리터럴 문자 |
이스케이프 시퀀스 |
참고 |
*
|
${*}
|
|
?
|
${?}
|
AWS IoT Greengrass 현재 단일 문자와 일치하는 ? 와일드카드를 지원하지 않습니다.
|
$
|
${$}
|
이 이스케이프 시퀀스를 사용하여 포함된 ${ 리소스를 일치시키십시오. 예를 들어${resourceName} , 이름이 지정된 리소스와 일치시키려면 다음을 지정해야 합니다${$}{resourceName} . 그렇지 않으면 로 시작하는 주제에 대한 액세스를 허용하는 등의 $ 리터럴을 사용하여 포함하는 $ 리소스를 일치시킬 수 있습니다. $aws
|
권한 부여 정책 예제
다음 권한 부여 정책 예제를 참조하여 구성 요소에 대한 권한 부여 정책을 구성할 수 있습니다.
예 권한 부여 정책이 포함된 구성 요소 레시피 예시
다음 예제 구성 요소 레시피에는 권한 부여 정책을 정의하는 accessControl
객체가 포함되어 있습니다. 이 정책은 com.example.HelloWorld
구성 요소가 test/topic
주제에 게시할 수 있는 권한을 부여합니다.
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.HelloWorld",
"ComponentVersion": "1.0.0",
"ComponentDescription": "A component that publishes messages.",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.HelloWorld:pubsub:1": {
"policyDescription": "Allows access to publish to test/topic.",
"operations": [
"aws.greengrass#PublishToTopic"
],
"resources": [
"test/topic"
]
}
}
}
}
},
"Manifests": [
{
"Lifecycle": {
"run": "java -jar {artifacts:path}/HelloWorld.jar"
}
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.HelloWorld
ComponentVersion: '1.0.0'
ComponentDescription: A component that publishes messages.
ComponentPublisher: Amazon
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.pubsub:
"com.example.HelloWorld:pubsub:1":
policyDescription: Allows access to publish to test/topic.
operations:
- "aws.greengrass#PublishToTopic"
resources:
- "test/topic"
Manifests:
- Lifecycle:
run: |-
java -jar {artifacts:path}/HelloWorld.jar
예 권한 부여 정책을 사용한 구성 요소 구성 업데이트의 예
배포의 다음 예제 구성 업데이트는 권한 부여 정책을 정의하는 accessControl
객체를 사용하여 구성 요소를 구성하도록 지정합니다. 이 정책은 com.example.HelloWorld
구성 요소가 test/topic
주제에 게시할 수 있는 권한을 부여합니다.
- Console
-
- 병합할 구성
-
{
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.HelloWorld:pubsub:1": {
"policyDescription": "Allows access to publish to test/topic.",
"operations": [
"aws.greengrass#PublishToTopic"
],
"resources": [
"test/topic"
]
}
}
}
}
- AWS CLI
-
다음 명령어는 코어 디바이스에 배포를 생성합니다.
aws greengrassv2 create-deployment --cli-input-json file://hello-world-deployment.json
hello-world-deployment.json
파일에는 다음 JSON 문서가 들어 있습니다.
{
"targetArn": "arn:aws:iot:us-west-2:123456789012:thing/MyGreengrassCore",
"deploymentName": "Deployment for MyGreengrassCore",
"components": {
"com.example.HelloWorld": {
"componentVersion": "1.0.0",
"configurationUpdate": {
"merge": "{\"accessControl\":{\"aws.greengrass.ipc.pubsub\":{\"com.example.HelloWorld:pubsub:1\":{\"policyDescription\":\"Allows access to publish to test/topic.\",\"operations\":[\"aws.greengrass#PublishToTopic\"],\"resources\":[\"test/topic\"]}}}}"
}
}
}
}
- Greengrass CLI
-
다음 Greengrass CLI 명령은 코어 기기에 로컬 배포를 생성합니다.
sudo greengrass-cli deployment create \
--recipeDir recipes \
--artifactDir artifacts \
--merge "com.example.HelloWorld=1.0.0" \
--update-config hello-world-configuration.json
이 hello-world-configuration.json
파일에는 다음 JSON 문서가 들어 있습니다.
{
"com.example.HelloWorld": {
"MERGE": {
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.HelloWorld:pubsub:1": {
"policyDescription": "Allows access to publish to test/topic.",
"operations": [
"aws.greengrass#PublishToTopic"
],
"resources": [
"test/topic"
]
}
}
}
}
}
}
IPC이벤트 스트림 구독
IPC작업을 사용하여 Greengrass 코어 디바이스에서 이벤트 스트림을 구독할 수 있습니다. 구독 작업을 사용하려면 구독 핸들러를 정의하고 서비스에 대한 요청을 생성하십시오. IPC 그러면 코어 기기가 구성 요소에 이벤트 메시지를 스트리밍할 때마다 IPC 클라이언트가 구독 핸들러의 함수를 실행합니다.
구독을 종료하여 이벤트 메시지 처리를 중지할 수 있습니다. 이렇게 하려면 구독을 여는 데 사용한 구독 작업 객체에서 closeStream()
close()
(Java), Close()
(Python) 또는 (C++) 를 호출해야 합니다.
AWS IoT Greengrass Core IPC 서비스는 다음과 같은 구독 작업을 지원합니다.
구독 핸들러를 정의합니다.
구독 핸들러를 정의하려면 이벤트 메시지, 오류 및 스트림 폐쇄를 처리하는 콜백 함수를 정의하십시오. IPC클라이언트 V1을 사용하는 경우 클래스에서 이러한 함수를 정의해야 합니다. 이후 버전의 Java 및 SDKs Python에서 사용할 수 있는 IPC 클라이언트 V2를 사용하는 경우 구독 핸들러 클래스를 만들지 않고도 이러한 함수를 정의할 수 있습니다.
- Java
-
IPC클라이언트 V1을 사용하는 경우 일반 software.amazon.awssdk.eventstreamrpc.StreamResponseHandler<StreamEventType
>
인터페이스를 구현해야 합니다.StreamEventType
구독 작업을 위한 이벤트 메시지 유형입니다. 다음 함수를 정의하여 이벤트 메시지, 오류 및 스트림 폐쇄를 처리합니다.
IPC클라이언트 V2를 사용하는 경우 구독 핸들러 클래스 외부에서 이러한 함수를 정의하거나 람다 식을 사용할 수 있습니다.
void onStreamEvent(StreamEventType
event)
-
IPC클라이언트가 메시지 또는 구성 요소 업데이트 알림과 같은 이벤트 메시지를 수신할 때 호출하는 MQTT 콜백입니다.
boolean onStreamError(Throwable error)
-
스트림 오류가 발생할 때 IPC 클라이언트가 호출하는 콜백입니다.
오류로 인해 구독 스트림을 닫으려면 true를 반환하고, 스트림을 열린 상태로 유지하려면 false를 반환합니다.
void onStreamClosed()
-
스트림이 종료될 때 IPC 클라이언트가 호출하는 콜백입니다.
- Python
-
IPC클라이언트 V1을 사용하는 경우 구독 작업에 해당하는 스트림 응답 핸들러 클래스를 확장해야 합니다. AWS IoT Device SDK 여기에는 각 구독 작업에 대한 구독 핸들러 클래스가 포함됩니다.StreamEventType
구독 작업에 대한 이벤트 메시지 유형입니다. 다음 함수를 정의하여 이벤트 메시지, 오류 및 스트림 폐쇄를 처리합니다.
IPC클라이언트 V2를 사용하는 경우 구독 핸들러 클래스 외부에서 이러한 함수를 정의하거나 람다 식을 사용할 수 있습니다.
def on_stream_event(self, event:
StreamEventType
) -> None
-
IPC클라이언트가 메시지 또는 구성 요소 업데이트 알림과 같은 이벤트 메시지를 수신할 때 호출하는 MQTT 콜백입니다.
def on_stream_error(self, error: Exception) -> bool
-
스트림 오류가 발생할 때 IPC 클라이언트가 호출하는 콜백입니다.
오류로 인해 구독 스트림을 닫으려면 true를 반환하고, 스트림을 열린 상태로 유지하려면 false를 반환합니다.
def on_stream_closed(self) -> None
-
스트림이 종료될 때 IPC 클라이언트가 호출하는 콜백입니다.
- C++
-
구독 작업에 해당하는 스트림 응답 핸들러 클래스에서 파생되는 클래스를 구현하십시오. AWS IoT Device SDK 여기에는 각 구독 작업에 대한 구독 핸들러 기본 클래스가 포함됩니다.StreamEventType
구독 작업에 대한 이벤트 메시지 유형입니다. 다음 함수를 정의하여 이벤트 메시지, 오류 및 스트림 폐쇄를 처리합니다.
void OnStreamEvent(StreamEventType
*event)
-
IPC클라이언트가 메시지 또는 구성 요소 업데이트 알림과 같은 이벤트 메시지를 수신할 때 호출하는 MQTT 콜백입니다.
bool OnStreamError(OperationError *error)
-
스트림 오류가 발생할 때 IPC 클라이언트가 호출하는 콜백입니다.
오류로 인해 구독 스트림을 닫으려면 true를 반환하고, 스트림을 열린 상태로 유지하려면 false를 반환합니다.
void OnStreamClosed()
-
스트림이 종료될 때 IPC 클라이언트가 호출하는 콜백입니다.
- JavaScript
-
구독 작업에 해당하는 스트림 응답 핸들러 클래스에서 파생되는 클래스를 구현하십시오. AWS IoT Device SDK 여기에는 각 구독 작업에 대한 구독 핸들러 기본 클래스가 포함됩니다.StreamEventType
구독 작업에 대한 이벤트 메시지 유형입니다. 다음 함수를 정의하여 이벤트 메시지, 오류 및 스트림 폐쇄를 처리합니다.
on(event: 'ended', listener: StreamingOperationEndedListener)
-
스트림이 종료될 때 IPC 클라이언트가 호출하는 콜백입니다.
on(event: 'streamError', listener: StreamingRpcErrorListener)
-
스트림 오류가 발생했을 때 IPC 클라이언트가 호출하는 콜백입니다.
오류로 인해 구독 스트림을 닫으려면 true를 반환하고, 스트림을 열린 상태로 유지하려면 false를 반환합니다.
on(event: 'message', listener: (message: InboundMessageType) => void)
-
IPC클라이언트가 메시지 또는 구성 요소 업데이트 알림과 같은 이벤트 메시지를 수신할 때 호출하는 MQTT 콜백입니다.
서브스크립션 핸들러 예시
다음 예제는 SubscribeToTopic 작업 및 구독 핸들러를 사용하여 로컬 게시/구독 메시지를 구독하는 방법을 보여줍니다.
- Java (IPC client V2)
-
예: 로컬 게시/구독 메시지 구독
package com.aws.greengrass.docs.samples.ipc;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2;
import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.*;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
public class SubscribeToTopicV2 {
public static void main(String[] args) {
String topic = args[0];
try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) {
SubscribeToTopicRequest request = new SubscribeToTopicRequest().withTopic(topic);
GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToTopicResponse,
SubscribeToTopicResponseHandler> response =
ipcClient.subscribeToTopic(request, SubscribeToTopicV2::onStreamEvent,
Optional.of(SubscribeToTopicV2::onStreamError),
Optional.of(SubscribeToTopicV2::onStreamClosed));
SubscribeToTopicResponseHandler responseHandler = response.getHandler();
System.out.println("Successfully subscribed to topic: " + topic);
// Keep the main thread alive, or the process will exit.
try {
while (true) {
Thread.sleep(10000);
}
} catch (InterruptedException e) {
System.out.println("Subscribe interrupted.");
}
// To stop subscribing, close the stream.
responseHandler.closeStream();
} catch (Exception e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while publishing to topic: " + topic);
} else {
System.err.println("Exception occurred when using IPC.");
}
e.printStackTrace();
System.exit(1);
}
}
public static void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) {
try {
BinaryMessage binaryMessage = subscriptionResponseMessage.getBinaryMessage();
String message = new String(binaryMessage.getMessage(), StandardCharsets.UTF_8);
String topic = binaryMessage.getContext().getTopic();
System.out.printf("Received new message on topic %s: %s%n", topic, message);
} catch (Exception e) {
System.err.println("Exception occurred while processing subscription response " +
"message.");
e.printStackTrace();
}
}
public static boolean onStreamError(Throwable error) {
System.err.println("Received a stream error.");
error.printStackTrace();
return false; // Return true to close stream, false to keep stream open.
}
public static void onStreamClosed() {
System.out.println("Subscribe to topic stream closed.");
}
}
- Python (IPC client V2)
-
예: 로컬 게시/구독 메시지 구독
import sys
import time
import traceback
from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2
from awsiot.greengrasscoreipc.model import (
SubscriptionResponseMessage,
UnauthorizedError
)
def main():
args = sys.argv[1:]
topic = args[0]
try:
ipc_client = GreengrassCoreIPCClientV2()
# Subscription operations return a tuple with the response and the operation.
_, operation = ipc_client.subscribe_to_topic(topic=topic, on_stream_event=on_stream_event,
on_stream_error=on_stream_error, on_stream_closed=on_stream_closed)
print('Successfully subscribed to topic: ' + topic)
# Keep the main thread alive, or the process will exit.
try:
while True:
time.sleep(10)
except InterruptedError:
print('Subscribe interrupted.')
# To stop subscribing, close the stream.
operation.close()
except UnauthorizedError:
print('Unauthorized error while subscribing to topic: ' +
topic, file=sys.stderr)
traceback.print_exc()
exit(1)
except Exception:
print('Exception occurred', file=sys.stderr)
traceback.print_exc()
exit(1)
def on_stream_event(event: SubscriptionResponseMessage) -> None:
try:
message = str(event.binary_message.message, 'utf-8')
topic = event.binary_message.context.topic
print('Received new message on topic %s: %s' % (topic, message))
except:
traceback.print_exc()
def on_stream_error(error: Exception) -> bool:
print('Received a stream error.', file=sys.stderr)
traceback.print_exc()
return False # Return True to close stream, False to keep stream open.
def on_stream_closed() -> None:
print('Subscribe to topic stream closed.')
if __name__ == '__main__':
main()
- C++
-
예: 로컬 게시/구독 메시지 구독
#include <iostream>
#include </crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class SubscribeResponseHandler : public SubscribeToTopicStreamHandler {
public:
virtual ~SubscribeResponseHandler() {}
private:
void OnStreamEvent(SubscriptionResponseMessage *response) override {
auto jsonMessage = response->GetJsonMessage();
if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) {
auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable();
// Handle JSON message.
} else {
auto binaryMessage = response->GetBinaryMessage();
if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) {
auto messageBytes = binaryMessage.value().GetMessage().value();
std::string messageString(messageBytes.begin(), messageBytes.end());
// Handle binary message.
}
}
}
bool OnStreamError(OperationError *error) override {
// Handle error.
return false; // Return true to close stream, false to keep stream open.
}
void OnStreamClosed() override {
// Handle close.
}
};
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
// Handle connection to IPC service.
}
void OnDisconnectCallback(RpcError error) override {
// Handle disconnection from IPC service.
}
bool OnErrorCallback(RpcError error) override {
// Handle IPC service connection error.
return true;
}
};
int main() {
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
String topic("my/topic");
int timeout = 10;
SubscribeToTopicRequest request;
request.SetTopic(topic);
//SubscribeResponseHandler streamHandler;
auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator());
auto operation = ipcClient.NewSubscribeToTopic(streamHandler);
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (!response) {
// Handle error.
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
(void)error;
// Handle operation error.
} else {
// Handle RPC error.
}
exit(-1);
}
// Keep the main thread alive, or the process will exit.
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(10));
}
operation->Close();
return 0;
}
- JavaScript
-
예: 로컬 게시/구독 메시지 구독
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc";
import {SubscribeToTopicRequest, SubscriptionResponseMessage} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model";
import {RpcError} from "aws-iot-device-sdk-v2/dist/eventstream_rpc";
class SubscribeToTopic {
private ipcClient : greengrasscoreipc.Client
private readonly topic : string;
constructor() {
// define your own constructor, e.g.
this.topic = "<define_your_topic>";
this.subscribeToTopic().then(r => console.log("Started workflow"));
}
private async subscribeToTopic() {
try {
this.ipcClient = await getIpcClient();
const subscribeToTopicRequest : SubscribeToTopicRequest = {
topic: this.topic,
}
const streamingOperation = this.ipcClient.subscribeToTopic(subscribeToTopicRequest, undefined); // conditionally apply options
streamingOperation.on("message", (message: SubscriptionResponseMessage) => {
// parse the message depending on your use cases, e.g.
if(message.binaryMessage && message.binaryMessage.message) {
const receivedMessage = message.binaryMessage?.message.toString();
}
});
streamingOperation.on("streamError", (error : RpcError) => {
// define your own error handling logic
})
streamingOperation.on("ended", () => {
// define your own logic
})
await streamingOperation.activate();
// Keep the main thread alive, or the process will exit.
await new Promise((resolve) => setTimeout(resolve, 10000))
} catch (e) {
// parse the error depending on your use cases
throw e
}
}
}
export async function getIpcClient(){
try {
const ipcClient = greengrasscoreipc.createClient();
await ipcClient.connect()
.catch(error => {
// parse the error depending on your use cases
throw error;
});
return ipcClient
} catch (err) {
// parse the error depending on your use cases
throw err
}
}
// starting point
const subscribeToTopic = new SubscribeToTopic();
IPC베스트 프랙티스
사용자 지정 구성 IPC 요소에서 사용하는 모범 사례는 IPC 클라이언트 V1과 IPC 클라이언트 V2 간에 다릅니다. 사용하는 IPC 클라이언트 버전의 모범 사례를 따르세요.
- IPC client V2
-
IPC클라이언트 V2는 별도의 스레드에서 콜백 함수를 실행하므로 IPC 클라이언트 V1에 비해 구독 핸들러 함수를 IPC 사용하고 작성할 때 따라야 할 지침이 적습니다.
- IPC client V1
-
IPC클라이언트 V1은 IPC 서버와 통신하고 구독 핸들러를 호출하는 단일 스레드를 사용합니다. 구독 핸들러 함수를 작성할 때는 이 동기 동작을 고려해야 합니다.
-
하나의 클라이언트를 재사용하십시오. IPC
IPC클라이언트를 만든 후에는 열어 두고 모든 IPC 작업에 재사용하십시오. 여러 클라이언트를 생성하면 추가 리소스가 사용되고 리소스 누수가 발생할 수 있습니다.
-
차단 코드를 비동기적으로 실행합니다.
IPC스레드가 차단된 동안에는 클라이언트 V1이 새 요청을 보내거나 새 이벤트 메시지를 처리할 수 없습니다. 핸들러 함수에서 실행하는 별도의 스레드에서 차단 코드를 실행해야 합니다. 차단 코드에는 sleep
호출, 지속적으로 실행되는 루프, 완료하는 데 시간이 걸리는 동기 I/O 요청이 포함됩니다.
-
새 IPC 요청을 비동기적으로 전송합니다.
응답을 기다리면 요청이 핸들러 함수를 차단하기 때문에 IPC 클라이언트 V1은 구독 핸들러 함수 내에서 새 요청을 보낼 수 없습니다. 핸들러 함수에서 실행하는 별도의 스레드에서 IPC 요청을 보내야 합니다.
-
예외 처리
IPC클라이언트 V1은 구독 핸들러 함수에서 포착되지 않은 예외를 처리하지 않습니다. 핸들러 함수에서 예외가 발생하면 구독이 종료되고 해당 예외는 구성 요소 로그에 표시되지 않습니다. 구독을 열린 상태로 유지하고 코드에서 발생하는 오류를 기록하려면 핸들러 함수에서 예외를 포착해야 합니다.