Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Le service IPC de messagerie AWS IoT Core MQTT vous permet d'envoyer et de recevoir des messages MQTT depuis et vers. AWS IoT Core Les composants peuvent publier des messages AWS IoT Core et s'abonner à des sujets pour agir sur les messages MQTT provenant d'autres sources. Pour plus d'informations sur l' AWS IoT Core implémentation de MQTT, consultez MQTT dans le Guide du AWS IoT Core développeur.
Note
Ce service IPC de messagerie MQTT vous permet d'échanger des messages avec. AWS IoT Core Pour plus d'informations sur l'échange de messages entre composants, consultezPublier/souscrire des messages locaux.
Versions minimales du SDK
Le tableau suivant répertorie les versions minimales Kit SDK des appareils AWS IoT que vous devez utiliser pour publier et vous abonner à des messages MQTT en provenance AWS IoT Core et à destination.
SDK | Version minimale |
---|---|
v1.2.10 |
|
v1.5.3 |
|
v1.17.0 |
|
v1.12.0 |
Autorisation
Pour utiliser la messagerie AWS IoT Core MQTT dans un composant personnalisé, vous devez définir des politiques d'autorisation qui permettent à votre composant d'envoyer et de recevoir des messages sur des sujets. Pour plus d'informations sur la définition des politiques d'autorisation, consultezAutoriser les composants à effectuer des opérations IPC.
Les politiques d'autorisation pour AWS IoT Core la messagerie MQTT présentent les propriétés suivantes.
Identifiant du service IPC : aws.greengrass.ipc.mqttproxy
Opération | Description | Ressources |
---|---|---|
|
Permet à un composant de publier des messages AWS IoT Core sur les sujets MQTT que vous spécifiez. |
Chaîne de rubrique, telle que |
|
Permet à un composant de s'abonner à des messages provenant AWS IoT Core des sujets que vous spécifiez. |
Chaîne de rubrique, telle que |
|
Permet à un composant de publier des messages AWS IoT Core MQTT et de s'y abonner pour les sujets que vous spécifiez. |
Chaîne de rubrique, telle que |
Caractères génériques MQTT dans les politiques d'autorisation AWS IoT Core MQTT
Vous pouvez utiliser des caractères génériques MQTT dans les politiques d'autorisation AWS IoT Core MQTT IPC. Les composants peuvent publier et s'abonner à des rubriques qui correspondent au filtre de rubrique que vous autorisez dans une politique d'autorisation. Par exemple, si la politique d'autorisation d'un composant accorde l'accès àtest/topic/#
, le composant peut s'abonner àtest/topic/#
, publier et s'abonner àtest/topic/filter
.
Variables de recette dans les AWS IoT Core politiques d'autorisation MQTT
Si vous utilisez la version 2.6.0 ou ultérieure du noyau Greengrass, vous pouvez utiliser la variable de {iot:thingName}
recette dans les politiques d'autorisation. Cette fonctionnalité vous permet de configurer une politique d'autorisation unique pour un groupe de périphériques principaux, chaque périphérique principal ne pouvant accéder qu'aux rubriques contenant son propre nom. Par exemple, vous pouvez autoriser un composant à accéder à la ressource thématique suivante.
devices/{iot:thingName}/messages
Pour plus d’informations, consultez Variables de recette et Utiliser des variables de recette dans les mises à jour de fusion.
Exemples de politiques d'autorisation
Vous pouvez vous référer aux exemples de politiques d'autorisation suivants pour vous aider à configurer les politiques d'autorisation pour vos composants.
Exemple de politique d'autorisation avec accès illimité
L'exemple de politique d'autorisation suivant permet à un composant de publier et de s'abonner à toutes les rubriques.
{
"accessControl": {
"aws.greengrass.ipc.mqttproxy": {
"com.example.MyIoTCorePubSubComponent
:mqttproxy:1": {
"policyDescription": "Allows access to publish/subscribe to all topics.",
"operations": [
"aws.greengrass#PublishToIoTCore",
"aws.greengrass#SubscribeToIoTCore"
],
"resources": [
"*"
]
}
}
}
}
Exemple de politique d'autorisation avec accès limité
L'exemple de politique d'autorisation suivant permet à un composant de publier et de s'abonner à deux rubriques nommées factory/1/events
etfactory/1/actions
.
{
"accessControl": {
"aws.greengrass.ipc.mqttproxy": {
"com.example.MyIoTCorePubSubComponent
:mqttproxy:1": {
"policyDescription": "Allows access to publish/subscribe to factory 1 topics.",
"operations": [
"aws.greengrass#PublishToIoTCore",
"aws.greengrass#SubscribeToIoTCore"
],
"resources": [
"factory/1/actions",
"factory/1/events"
]
}
}
}
}
Exemple de politique d'autorisation pour un groupe de périphériques principaux
Important
Cet exemple utilise une fonctionnalité disponible pour les versions 2.6.0 et ultérieures du composant Greengrass nucleus. Greengrass nucleus v2.6.0 ajoute la prise en charge de la plupart des variables de recette, notamment dans les configurations de composants{iot:thingName}
.
L'exemple de politique d'autorisation suivant permet à un composant de publier et de s'abonner à une rubrique contenant le nom du périphérique principal qui exécute le composant.
{
"accessControl": {
"aws.greengrass.ipc.mqttproxy": {
"com.example.MyIoTCorePubSubComponent
:mqttproxy:1": {
"policyDescription": "Allows access to publish/subscribe to all topics.",
"operations": [
"aws.greengrass#PublishToIoTCore",
"aws.greengrass#SubscribeToIoTCore"
],
"resources": [
"factory/1/devices/{iot:thingName}/controls"
]
}
}
}
}
PublishToIoTCore
Publie un message MQTT AWS IoT Core sur un sujet.
Lorsque vous publiez des messages MQTT sur AWS IoT Core, il existe un quota de 100 transactions par seconde. Si vous dépassez ce quota, les messages sont mis en file d'attente pour traitement sur l'appareil Greengrass. Il existe également un quota de 512 Ko de données par seconde et un quota de 20 000 publications par seconde à l'échelle du compte (2 000 dans certains Régions AWS cas). Pour plus d'informations sur les limites du courtier de messages MQTT dans AWS IoT Core, consultez les sections Limites et quotas du courtier de AWS IoT Core messages et du protocole.
Si vous dépassez ces quotas, l'appareil Greengrass limite la publication de messages à. AWS IoT Core Les messages sont stockés dans un spouleur en mémoire. Par défaut, la mémoire allouée au spouleur est de 2,5 Mo. Si le spouleur se remplit, les nouveaux messages sont rejetés. Vous pouvez augmenter la taille du spouleur. Pour plus d'informations, consultez la section Configurationdans la documentation Noyau de Greengrass. Pour éviter de remplir le spouleur et d'avoir à augmenter la mémoire allouée, limitez les demandes de publication à un maximum de 100 demandes par seconde.
Lorsque votre application doit envoyer des messages à un débit plus élevé ou des messages plus volumineux, pensez à utiliser le Gestionnaire de flux pour envoyer des messages à Kinesis Data Streams. Le composant du gestionnaire de flux est conçu pour transférer de gros volumes de données vers le AWS Cloud. Pour de plus amples informations, veuillez consulter Gérez les flux de données sur les appareils principaux de Greengrass.
Demande
La demande de cette opération comporte les paramètres suivants :
topicName
(Python :topic_name
)-
Rubrique dans laquelle le message doit être publié.
qos
-
Les QoS MQTT à utiliser. Cette énumération possède
QOS
les valeurs suivantes :-
AT_MOST_ONCE
— QoS 0. Le message MQTT est délivré au plus une fois. -
AT_LEAST_ONCE
— QoS 1. Le message MQTT est délivré au moins une fois.
-
payload
-
(Facultatif) La charge utile du message sous forme de blob.
Les fonctionnalités suivantes sont disponibles pour la version 2.10.0 et les versions ultérieures Noyau de Greengrass lors de l'utilisation de MQTT 5. Ces fonctionnalités sont ignorées lorsque vous utilisez MQTT 3.1.1. Le tableau suivant répertorie la version minimale du SDK de l' AWS IoT appareil que vous devez utiliser pour accéder à ces fonctionnalités.
SDK | Version minimale |
---|---|
Kit SDK des appareils AWS IoT pour Python
v2 |
v1.15.0 |
Kit SDK des appareils AWS IoT pour Java
v2 |
v1.13.0 |
Kit SDK des appareils AWS IoT pour C++
v2 |
v1.24.0 |
Kit SDK des appareils AWS IoT pour JavaScript v2 |
v1.13.0 |
payloadFormat
-
(Facultatif) Format de la charge utile du message. Si vous ne définissez pas le
payloadFormat
, le type est supposé êtreBYTES
. L'enum possède les valeurs suivantes :-
BYTES
— Le contenu de la charge utile est un blob binaire. -
UTF8
— Le contenu de la charge utile est une UTF8 chaîne de caractères.
-
retain
-
(Facultatif) Indique s'il faut définir l'option de conservation MQTT sur
true
lors de la publication. userProperties
-
(Facultatif) Liste des
UserProperty
objets spécifiques à l'application à envoyer. L'UserProperty
objet est défini comme suit :UserProperty: key: string value: string
messageExpiryIntervalSeconds
-
(Facultatif) Nombre de secondes avant que le message n'expire et ne soit supprimé par le serveur. Si cette valeur n'est pas définie, le message n'expire pas.
correlationData
-
(Facultatif) Informations ajoutées à la demande qui peuvent être utilisées pour associer une demande à une réponse.
responseTopic
-
(Facultatif) Rubrique à utiliser pour le message de réponse.
contentType
-
(Facultatif) Identifiant spécifique à l'application du type de contenu du message.
Réponse
Cette opération ne fournit aucune information dans sa réponse.
Exemples
Les exemples suivants montrent comment appeler cette opération dans le code de composant personnalisé.
Exemple : publier un message
package com.aws.greengrass.docs.samples.ipc;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2;
import software.amazon.awssdk.aws.greengrass.model.PublishToIoTCoreRequest;
import software.amazon.awssdk.aws.greengrass.model.QOS;
import java.nio.charset.StandardCharsets;
public class PublishToIoTCore {
public static void main(String[] args) {
String topic = args[0];
String message = args[1];
QOS qos = QOS.get(args[2]);
try (GreengrassCoreIPCClientV2 ipcClientV2 = GreengrassCoreIPCClientV2.builder().build()) {
ipcClientV2.publishToIoTCore(new PublishToIoTCoreRequest()
.withTopicName(topic)
.withPayload(message.getBytes(StandardCharsets.UTF_8))
.withQos(qos));
System.out.println("Successfully published to topic: " + topic);
} catch (Exception e) {
System.err.println("Exception occurred.");
e.printStackTrace();
System.exit(1);
}
}
}
SubscribeToIoTCore
Abonnez-vous aux messages MQTT à partir d'un AWS IoT Core sujet ou d'un filtre de sujet. Le logiciel AWS IoT Greengrass Core supprime les abonnements lorsque le composant atteint la fin de son cycle de vie.
Il s'agit d'une opération d'abonnement dans le cadre de laquelle vous vous abonnez à un flux de messages d'événements. Pour utiliser cette opération, définissez un gestionnaire de réponse au flux avec des fonctions qui gèrent les messages d'événements, les erreurs et la fermeture du flux. Pour de plus amples informations, veuillez consulter Abonnez-vous aux diffusions d'événements IPC.
Type de message d'événement : IoTCoreMessage
Demande
La demande de cette opération comporte les paramètres suivants :
topicName
(Python :topic_name
)-
Rubrique à laquelle vous souhaitez vous abonner. Vous pouvez utiliser les caractères génériques des rubriques MQTT (
#
et+
) pour vous abonner à plusieurs rubriques. qos
-
Les QoS MQTT à utiliser. Cette énumération possède
QOS
les valeurs suivantes :-
AT_MOST_ONCE
— QoS 0. Le message MQTT est délivré au plus une fois. -
AT_LEAST_ONCE
— QoS 1. Le message MQTT est délivré au moins une fois.
-
Réponse
La réponse de cette opération contient les informations suivantes :
messages
-
Le flux de messages MQTT. Cet objet contient
IoTCoreMessage
les informations suivantes :message
-
Le message MQTT. Cet objet contient
MQTTMessage
les informations suivantes :topicName
(Python :topic_name
)-
Sujet dans lequel le message a été publié.
payload
-
(Facultatif) La charge utile du message sous forme de blob.
Les fonctionnalités suivantes sont disponibles pour la version 2.10.0 et les versions ultérieures Noyau de Greengrass lors de l'utilisation de MQTT 5. Ces fonctionnalités sont ignorées lorsque vous utilisez MQTT 3.1.1. Le tableau suivant répertorie la version minimale du SDK de l' AWS IoT appareil que vous devez utiliser pour accéder à ces fonctionnalités.
SDK Version minimale Kit SDK des appareils AWS IoT pour Python v2 v1.15.0 Kit SDK des appareils AWS IoT pour Java v2 v1.13.0 Kit SDK des appareils AWS IoT pour C++ v2 v1.24.0 Kit SDK des appareils AWS IoT pour JavaScript v2 v1.13.0 payloadFormat
-
(Facultatif) Format de la charge utile du message. Si vous ne définissez pas le
payloadFormat
, le type est supposé êtreBYTES
. L'enum possède les valeurs suivantes :-
BYTES
— Le contenu de la charge utile est un blob binaire. -
UTF8
— Le contenu de la charge utile est une UTF8 chaîne de caractères.
-
retain
-
(Facultatif) Indique s'il faut définir l'option de conservation MQTT sur
true
lors de la publication. userProperties
-
(Facultatif) Liste des
UserProperty
objets spécifiques à l'application à envoyer. L'UserProperty
objet est défini comme suit :UserProperty: key: string value: string
messageExpiryIntervalSeconds
-
(Facultatif) Nombre de secondes avant que le message n'expire et ne soit supprimé par le serveur. Si cette valeur n'est pas définie, le message n'expire pas.
correlationData
-
(Facultatif) Informations ajoutées à la demande qui peuvent être utilisées pour associer une demande à une réponse.
responseTopic
-
(Facultatif) Rubrique à utiliser pour le message de réponse.
contentType
-
(Facultatif) Identifiant spécifique à l'application du type de contenu du message.
Exemples
Les exemples suivants montrent comment appeler cette opération dans le code de composant personnalisé.
Exemple : s'abonner à des messages
package com.aws.greengrass.docs.samples.ipc;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2;
import software.amazon.awssdk.aws.greengrass.SubscribeToIoTCoreResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.QOS;
import software.amazon.awssdk.aws.greengrass.model.IoTCoreMessage;
import software.amazon.awssdk.aws.greengrass.model.SubscribeToIoTCoreRequest;
import software.amazon.awssdk.aws.greengrass.model.SubscribeToIoTCoreResponse;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
public class SubscribeToIoTCore {
public static void main(String[] args) {
String topic = args[0];
QOS qos = QOS.get(args[1]);
Consumer<IoTCoreMessage> onStreamEvent = ioTCoreMessage ->
System.out.printf("Received new message on topic %s: %s%n",
ioTCoreMessage.getMessage().getTopicName(),
new String(ioTCoreMessage.getMessage().getPayload(), StandardCharsets.UTF_8));
Optional<Function<Throwable, Boolean>> onStreamError =
Optional.of(e -> {
System.err.println("Received a stream error.");
e.printStackTrace();
return false;
});
Optional<Runnable> onStreamClosed = Optional.of(() ->
System.out.println("Subscribe to IoT Core stream closed."));
try (GreengrassCoreIPCClientV2 ipcClientV2 = GreengrassCoreIPCClientV2.builder().build()) {
SubscribeToIoTCoreRequest request = new SubscribeToIoTCoreRequest()
.withTopicName(topic)
.withQos(qos);
GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToIoTCoreResponse, SubscribeToIoTCoreResponseHandler>
streamingResponse = ipcClientV2.subscribeToIoTCore(request, onStreamEvent, onStreamError, onStreamClosed);
streamingResponse.getResponse();
System.out.println("Successfully subscribed to topic: " + topic);
// Keep the main thread alive, or the process will exit.
while (true) {
Thread.sleep(10000);
}
// To stop subscribing, close the stream.
streamingResponse.getHandler().closeStream();
} catch (InterruptedException e) {
System.out.println("Subscribe interrupted.");
} catch (Exception e) {
System.err.println("Exception occurred.");
e.printStackTrace();
System.exit(1);
}
}
}
Exemples
Utilisez les exemples suivants pour apprendre à utiliser le service AWS IoT Core MQTT IPC dans vos composants.
L'exemple de recette suivant permet au composant de publier dans toutes les rubriques.
L'exemple d'application C++ suivant montre comment utiliser le service AWS IoT Core MQTT IPC pour publier des messages sur. AWS IoT Core
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
std::cout << "OnConnectCallback" << std::endl;
}
void OnDisconnectCallback(RpcError error) override {
std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl;
exit(-1);
}
bool OnErrorCallback(RpcError error) override {
std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl;
return true;
}
};
int main() {
String message("Hello from the Greengrass IPC MQTT publisher (C++).");
String topic("test/topic/cpp");
QOS qos = QOS_AT_LEAST_ONCE;
int timeout = 10;
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
while (true) {
PublishToIoTCoreRequest request;
Vector<uint8_t> messageData({message.begin(), message.end()});
request.SetTopicName(topic);
request.SetPayload(messageData);
request.SetQos(qos);
auto operation = ipcClient.NewPublishToIoTCore();
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (response) {
std::cout << "Successfully published to topic: " << topic << std::endl;
} else {
// An error occurred.
std::cout << "Failed to publish to topic: " << topic << std::endl;
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
std::cout << "Operation error: " << error->GetMessage().value() << std::endl;
} else {
std::cout << "RPC error: " << response.GetRpcError() << std::endl;
}
exit(-1);
}
std::this_thread::sleep_for(std::chrono::seconds(5));
}
return 0;
}
L'exemple de recette suivant permet au composant de s'abonner à toutes les rubriques.
L'exemple d'application C++ suivant montre comment utiliser le service AWS IoT Core MQTT IPC pour s'abonner à des messages provenant de. AWS IoT Core
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IoTCoreResponseHandler : public SubscribeToIoTCoreStreamHandler {
public:
virtual ~IoTCoreResponseHandler() {}
private:
void OnStreamEvent(IoTCoreMessage *response) override {
auto message = response->GetMessage();
if (message.has_value() && message.value().GetPayload().has_value()) {
auto messageBytes = message.value().GetPayload().value();
std::string messageString(messageBytes.begin(), messageBytes.end());
std::string messageTopic = message.value().GetTopicName().value().c_str();
std::cout << "Received new message on topic: " << messageTopic << std::endl;
std::cout << "Message: " << messageString << std::endl;
}
}
bool OnStreamError(OperationError *error) override {
std::cout << "Received an operation error: ";
if (error->GetMessage().has_value()) {
std::cout << error->GetMessage().value();
}
std::cout << std::endl;
return false; // Return true to close stream, false to keep stream open.
}
void OnStreamClosed() override {
std::cout << "Subscribe to IoT Core stream closed." << std::endl;
}
};
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
std::cout << "OnConnectCallback" << std::endl;
}
void OnDisconnectCallback(RpcError error) override {
std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl;
exit(-1);
}
bool OnErrorCallback(RpcError error) override {
std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl;
return true;
}
};
int main() {
String topic("test/topic/cpp");
QOS qos = QOS_AT_LEAST_ONCE;
int timeout = 10;
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
SubscribeToIoTCoreRequest request;
request.SetTopicName(topic);
request.SetQos(qos);
auto streamHandler = MakeShared<IoTCoreResponseHandler>(DefaultAllocator());
auto operation = ipcClient.NewSubscribeToIoTCore(streamHandler);
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (response) {
std::cout << "Successfully subscribed to topic: " << topic << std::endl;
} else {
// An error occurred.
std::cout << "Failed to subscribe to topic: " << topic << std::endl;
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
std::cout << "Operation error: " << error->GetMessage().value() << std::endl;
} else {
std::cout << "RPC error: " << response.GetRpcError() << std::endl;
}
exit(-1);
}
// Keep the main thread alive, or the process will exit.
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(10));
}
operation->Close();
return 0;
}