As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
O IPC serviço de AWS IoT Core MQTT mensagens permite que você envie e receba MQTT mensagens de e para AWS IoT Core. Os componentes podem publicar mensagens AWS IoT Core e se inscrever em tópicos para agir em MQTT mensagens de outras fontes. Para obter mais informações sobre a AWS IoT Core implementação doMQTT, consulte MQTTo Guia do AWS IoT Core desenvolvedor.
nota
Este IPC serviço MQTT de mensagens permite que você troque mensagens com AWS IoT Core. Para obter mais informações sobre como trocar mensagens entre componentes, consulte Publicar/assinar mensagens locais.
SDKVersões mínimas
A tabela a seguir lista as versões mínimas do AWS IoT Device SDK que você deve usar para publicar e assinar MQTT mensagens de e para AWS IoT Core.
SDK | Versão mínima |
---|---|
v1.2.10 |
|
v1.5.3 |
|
v1.17.0 |
|
v1.12.0 |
Autorização
Para usar AWS IoT Core MQTT mensagens em um componente personalizado, você deve definir políticas de autorização que permitam que seu componente envie e receba mensagens sobre tópicos. Para obter informações sobre a definição de políticas de autorização, consulte Autorizar componentes a realizar operações IPC.
As políticas de autorização para AWS IoT Core MQTT mensagens têm as seguintes propriedades.
IPCidentificador de serviço: aws.greengrass.ipc.mqttproxy
Operação | Descrição | Recursos |
---|---|---|
|
Permite que um componente publique mensagens AWS IoT Core sobre os MQTT tópicos que você especificar. |
Uma sequência de tópicos, como |
|
Permite que um componente assine mensagens dos AWS IoT Core tópicos que você especificar. |
Uma sequência de tópicos, como |
|
Permite que um componente publique e assine AWS IoT Core MQTT mensagens para os tópicos que você especificar. |
Uma sequência de tópicos, como |
MQTTcuringas nas políticas de AWS IoT Core MQTT autorização
Você pode usar MQTT curingas nas políticas de AWS IoT Core MQTT IPC autorização. Os componentes podem publicar e assinar tópicos que correspondam ao filtro de tópicos permitido em uma política de autorização. Por exemplo, se a política de autorização de um componente conceder acesso a test/topic/#
, o componente pode se inscrever no test/topic/#
, e publicar e se inscrever no test/topic/filter
.
Variáveis de receita nas políticas AWS IoT Core MQTT de autorização
Se você usa a versão 2.6.0 ou posterior do núcleo do Greengrass, pode usar a variável de fórmula {iot:thingName}
nas políticas de autorização. Esse recurso permite que você configure uma única política de autorização para um grupo de dispositivos principais, em que cada dispositivo principal pode acessar somente tópicos que contenham seu próprio nome. Por exemplo, você pode permitir que um componente acesse o seguinte recurso do tópico.
devices/{iot:thingName}/messages
Para ter mais informações, consulte Variáveis da fórmula e Usar variáveis de fórmula em atualizações de mesclagem.
Exemplos de política de autorização
Consulte os exemplos de política de autorização a seguir para configurar políticas de autorização para seus componentes.
exemplo Exemplo de política de autorização com acesso irrestrito
O exemplo de política de autorização a seguir permite que um componente publique e assine em todos os tópicos.
{
"accessControl": {
"aws.greengrass.ipc.mqttproxy": {
"com.example.MyIoTCorePubSubComponent
:mqttproxy:1": {
"policyDescription": "Allows access to publish/subscribe to all topics.",
"operations": [
"aws.greengrass#PublishToIoTCore",
"aws.greengrass#SubscribeToIoTCore"
],
"resources": [
"*"
]
}
}
}
}
exemplo Exemplo de política de autorização com acesso limitado
O exemplo de política de autorização a seguir permite que um componente publique e assine dois tópicos chamados factory/1/events
e factory/1/actions
.
{
"accessControl": {
"aws.greengrass.ipc.mqttproxy": {
"com.example.MyIoTCorePubSubComponent
:mqttproxy:1": {
"policyDescription": "Allows access to publish/subscribe to factory 1 topics.",
"operations": [
"aws.greengrass#PublishToIoTCore",
"aws.greengrass#SubscribeToIoTCore"
],
"resources": [
"factory/1/actions",
"factory/1/events"
]
}
}
}
}
exemplo Exemplo de política de autorização para um grupo de dispositivos principais
Importante
Este exemplo usa um recurso que está disponível para a versão 2.6.0 e posterior do componente de núcleo do Greengrass. O núcleo do Greengrass v2.6.0 adiciona suporte para a maioria das variáveis de fórmula, como {iot:thingName}
, em configurações de componentes.
O exemplo de política de autorização a seguir permite que um componente publique e assine um tópico que contém o nome do dispositivo principal que executa o componente.
{
"accessControl": {
"aws.greengrass.ipc.mqttproxy": {
"com.example.MyIoTCorePubSubComponent
:mqttproxy:1": {
"policyDescription": "Allows access to publish/subscribe to all topics.",
"operations": [
"aws.greengrass#PublishToIoTCore",
"aws.greengrass#SubscribeToIoTCore"
],
"resources": [
"factory/1/devices/{iot:thingName}/controls"
]
}
}
}
}
PublishToIoTCore
Publica uma MQTT mensagem AWS IoT Core sobre um tópico.
Quando você publica MQTT mensagens no AWS IoT Core, há uma cota de 100 transações por segundo. Se você exceder essa cota, as mensagens serão enfileiradas para processamento no dispositivo Greengrass. Há também uma cota de 512 Kb de dados por segundo e uma cota de 20.000 publicações por segundo em toda a conta (2.000 em algumas). Regiões da AWS Para obter mais informações sobre os limites do agente de MQTT mensagens em AWS IoT Core, consulte limites e cotas do agente de AWS IoT Core mensagens e do protocolo.
Se você exceder essas cotas, o dispositivo Greengrass limita a publicação de mensagens a. AWS IoT Core As mensagens são armazenadas em um spooler na memória. Por padrão, a memória alocada para o spooler é de 2,5 Mb. Se o spooler ficar cheio, novas mensagens serão rejeitadas. Você pode aumentar o tamanho do spooler. Para obter mais informações, consulte a Configuração documentação do Núcleo do Greengrass. Para evitar preencher o spooler e precisar aumentar a memória alocada, limite as solicitações de publicação a no máximo 100 solicitações por segundo.
Quando sua aplicação precisar enviar mensagens em uma taxa maior ou maiores, considere usar o Gerenciador de fluxos para enviar mensagens para o Kinesis Data Streams. O componente gerenciador de fluxos foi projetado para transferir dados de alto volume para a Nuvem AWS. Para obter mais informações, consulte Gerenciar fluxos de dados no nos dispositivos principais do Greengrass.
Solicitação
A solicitação dessa operação tem os seguintes parâmetros:
topicName
(Python:topic_name
)-
O tópico no qual publicar a mensagem.
qos
-
O MQTT QoS a ser usado. Esse enumerador,
QOS
, tem os seguintes valores:-
AT_MOST_ONCE
: QoS 0. A MQTT mensagem é entregue no máximo uma vez. -
AT_LEAST_ONCE
: QoS 1. A MQTT mensagem é entregue pelo menos uma vez.
-
payload
-
(Opcional) A carga útil da mensagem como um blob.
Os recursos a seguir estão disponíveis para a versão 2.10.0 e versões posteriores do Núcleo do Greengrass ao usar o 5. MQTT Esses recursos são ignorados quando você está usando o MQTT 3.1.1. A tabela a seguir lista a versão mínima do AWS IoT dispositivo SDK que você deve usar para acessar esses recursos.
SDK | Versão mínima |
---|---|
AWS IoT Device SDK for Python
v2 |
v1.15.0 |
AWS IoT Device SDK for Java
v2 |
v1.13.0 |
AWS IoT Device SDK for C++
v2 |
v1.24.0 |
AWS IoT Device SDK for JavaScript v2 |
v1.13.0 |
payloadFormat
-
(Opcional) O formato da carga útil da mensagem. Se você não definir o
payloadFormat
, presume-se que o tipo sejaBYTES
. O enumerador tem os seguintes valores:-
BYTES
: o conteúdo da carga útil é um blob binário. -
UTF8
— O conteúdo da carga é uma UTF8 sequência de caracteres.
-
retain
-
(Opcional) Indica se a opção de MQTT retenção deve ser definida como
true
ao publicar. userProperties
-
(Opcional) Uma lista de
UserProperty
objetos específicos da aplicação a serem enviados. O objetoUserProperty
é definido da seguinte maneira:UserProperty: key: string value: string
messageExpiryIntervalSeconds
-
(Opcional) O número de segundos antes de a mensagem expirar e ser excluída pelo servidor. Se esse valor não for definido, a mensagem não expira.
correlationData
-
(Opcional) Informações adicionadas à solicitação que podem ser usadas para associar uma solicitação a uma resposta.
responseTopic
-
(Opcional) O tópico que deve ser usado para a mensagem de resposta.
contentType
-
(Opcional) Um identificador específico da aplicação do tipo de conteúdo da mensagem.
Resposta
Essa operação não fornece nenhuma informação em sua resposta.
Exemplos
Os exemplos a seguir demonstram como chamar essa operação no código do componente personalizado.
exemplo Exemplo: publicar uma mensagem
package com.aws.greengrass.docs.samples.ipc;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2;
import software.amazon.awssdk.aws.greengrass.model.PublishToIoTCoreRequest;
import software.amazon.awssdk.aws.greengrass.model.QOS;
import java.nio.charset.StandardCharsets;
public class PublishToIoTCore {
public static void main(String[] args) {
String topic = args[0];
String message = args[1];
QOS qos = QOS.get(args[2]);
try (GreengrassCoreIPCClientV2 ipcClientV2 = GreengrassCoreIPCClientV2.builder().build()) {
ipcClientV2.publishToIoTCore(new PublishToIoTCoreRequest()
.withTopicName(topic)
.withPayload(message.getBytes(StandardCharsets.UTF_8))
.withQos(qos));
System.out.println("Successfully published to topic: " + topic);
} catch (Exception e) {
System.err.println("Exception occurred.");
e.printStackTrace();
System.exit(1);
}
}
}
SubscribeToIoTCore
Assine MQTT mensagens de um AWS IoT Core tópico ou filtro de tópicos. O software AWS IoT Greengrass principal remove as assinaturas quando o componente chega ao fim de seu ciclo de vida.
Essa operação é uma operação de assinatura em que você assina um fluxo de mensagens de eventos. Para usar essa operação, defina um manipulador de resposta de fluxo com funções que manipulam mensagens de eventos, erros e encerramento de fluxo. Para obter mais informações, consulte Inscreva-se em transmissões de IPC eventos.
Tipo de mensagem do evento: IoTCoreMessage
Solicitação
A solicitação dessa operação tem os seguintes parâmetros:
topicName
(Python:topic_name
)-
O tópico a ser assinado. Você pode usar curingas de MQTT tópicos (
#
e+
) para se inscrever em vários tópicos. qos
-
O MQTT QoS a ser usado. Esse enumerador,
QOS
, tem os seguintes valores:-
AT_MOST_ONCE
: QoS 0. A MQTT mensagem é entregue no máximo uma vez. -
AT_LEAST_ONCE
: QoS 1. A MQTT mensagem é entregue pelo menos uma vez.
-
Resposta
A resposta dessa operação tem as seguintes informações:
messages
-
O fluxo de MQTT mensagens. Esse objeto,
IoTCoreMessage
, contém as seguintes informações:message
-
A MQTT mensagem. Esse objeto,
MQTTMessage
, contém as seguintes informações:topicName
(Python:topic_name
)-
O tópico em que a mensagem foi publicada.
payload
-
(Opcional) A carga útil da mensagem como um blob.
Os recursos a seguir estão disponíveis para a versão 2.10.0 e versões posteriores do Núcleo do Greengrass ao usar o 5. MQTT Esses recursos são ignorados quando você está usando o MQTT 3.1.1. A tabela a seguir lista a versão mínima do AWS IoT dispositivo SDK que você deve usar para acessar esses recursos.
SDK Versão mínima AWS IoT Device SDK for Python v2 v1.15.0 AWS IoT Device SDK for Java v2 v1.13.0 AWS IoT Device SDK for C++ v2 v1.24.0 AWS IoT Device SDK for JavaScript v2 v1.13.0 payloadFormat
-
(Opcional) O formato da carga útil da mensagem. Se você não definir o
payloadFormat
, presume-se que o tipo sejaBYTES
. O enumerador tem os seguintes valores:-
BYTES
: o conteúdo da carga útil é um blob binário. -
UTF8
— O conteúdo da carga é uma UTF8 sequência de caracteres.
-
retain
-
(Opcional) Indica se a opção de MQTT retenção deve ser definida como
true
ao publicar. userProperties
-
(Opcional) Uma lista de
UserProperty
objetos específicos da aplicação a serem enviados. O objetoUserProperty
é definido da seguinte maneira:UserProperty: key: string value: string
messageExpiryIntervalSeconds
-
(Opcional) O número de segundos antes de a mensagem expirar e ser excluída pelo servidor. Se esse valor não for definido, a mensagem não expira.
correlationData
-
(Opcional) Informações adicionadas à solicitação que podem ser usadas para associar uma solicitação a uma resposta.
responseTopic
-
(Opcional) O tópico que deve ser usado para a mensagem de resposta.
contentType
-
(Opcional) Um identificador específico da aplicação do tipo de conteúdo da mensagem.
Exemplos
Os exemplos a seguir demonstram como chamar essa operação no código do componente personalizado.
exemplo Exemplo: inscrever-se em mensagens
package com.aws.greengrass.docs.samples.ipc;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2;
import software.amazon.awssdk.aws.greengrass.SubscribeToIoTCoreResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.QOS;
import software.amazon.awssdk.aws.greengrass.model.IoTCoreMessage;
import software.amazon.awssdk.aws.greengrass.model.SubscribeToIoTCoreRequest;
import software.amazon.awssdk.aws.greengrass.model.SubscribeToIoTCoreResponse;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
public class SubscribeToIoTCore {
public static void main(String[] args) {
String topic = args[0];
QOS qos = QOS.get(args[1]);
Consumer<IoTCoreMessage> onStreamEvent = ioTCoreMessage ->
System.out.printf("Received new message on topic %s: %s%n",
ioTCoreMessage.getMessage().getTopicName(),
new String(ioTCoreMessage.getMessage().getPayload(), StandardCharsets.UTF_8));
Optional<Function<Throwable, Boolean>> onStreamError =
Optional.of(e -> {
System.err.println("Received a stream error.");
e.printStackTrace();
return false;
});
Optional<Runnable> onStreamClosed = Optional.of(() ->
System.out.println("Subscribe to IoT Core stream closed."));
try (GreengrassCoreIPCClientV2 ipcClientV2 = GreengrassCoreIPCClientV2.builder().build()) {
SubscribeToIoTCoreRequest request = new SubscribeToIoTCoreRequest()
.withTopicName(topic)
.withQos(qos);
GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToIoTCoreResponse, SubscribeToIoTCoreResponseHandler>
streamingResponse = ipcClientV2.subscribeToIoTCore(request, onStreamEvent, onStreamError, onStreamClosed);
streamingResponse.getResponse();
System.out.println("Successfully subscribed to topic: " + topic);
// Keep the main thread alive, or the process will exit.
while (true) {
Thread.sleep(10000);
}
// To stop subscribing, close the stream.
streamingResponse.getHandler().closeStream();
} catch (InterruptedException e) {
System.out.println("Subscribe interrupted.");
} catch (Exception e) {
System.err.println("Exception occurred.");
e.printStackTrace();
System.exit(1);
}
}
}
Exemplos
Use os exemplos a seguir para aprender a usar o AWS IoT Core MQTT IPC serviço em seus componentes.
O exemplo de receita a seguir permite que o componente seja publicado em todos os tópicos.
O exemplo de aplicativo C++ a seguir demonstra como usar o AWS IoT Core MQTT IPC serviço para publicar mensagens no. AWS IoT Core
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
std::cout << "OnConnectCallback" << std::endl;
}
void OnDisconnectCallback(RpcError error) override {
std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl;
exit(-1);
}
bool OnErrorCallback(RpcError error) override {
std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl;
return true;
}
};
int main() {
String message("Hello from the Greengrass IPC MQTT publisher (C++).");
String topic("test/topic/cpp");
QOS qos = QOS_AT_LEAST_ONCE;
int timeout = 10;
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
while (true) {
PublishToIoTCoreRequest request;
Vector<uint8_t> messageData({message.begin(), message.end()});
request.SetTopicName(topic);
request.SetPayload(messageData);
request.SetQos(qos);
auto operation = ipcClient.NewPublishToIoTCore();
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (response) {
std::cout << "Successfully published to topic: " << topic << std::endl;
} else {
// An error occurred.
std::cout << "Failed to publish to topic: " << topic << std::endl;
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
std::cout << "Operation error: " << error->GetMessage().value() << std::endl;
} else {
std::cout << "RPC error: " << response.GetRpcError() << std::endl;
}
exit(-1);
}
std::this_thread::sleep_for(std::chrono::seconds(5));
}
return 0;
}
O exemplo de receita a seguir permite que o componente seja inscrito em todos os tópicos.
O exemplo de aplicativo C++ a seguir demonstra como usar o AWS IoT Core MQTT IPC serviço para assinar mensagens do. AWS IoT Core
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IoTCoreResponseHandler : public SubscribeToIoTCoreStreamHandler {
public:
virtual ~IoTCoreResponseHandler() {}
private:
void OnStreamEvent(IoTCoreMessage *response) override {
auto message = response->GetMessage();
if (message.has_value() && message.value().GetPayload().has_value()) {
auto messageBytes = message.value().GetPayload().value();
std::string messageString(messageBytes.begin(), messageBytes.end());
std::string messageTopic = message.value().GetTopicName().value().c_str();
std::cout << "Received new message on topic: " << messageTopic << std::endl;
std::cout << "Message: " << messageString << std::endl;
}
}
bool OnStreamError(OperationError *error) override {
std::cout << "Received an operation error: ";
if (error->GetMessage().has_value()) {
std::cout << error->GetMessage().value();
}
std::cout << std::endl;
return false; // Return true to close stream, false to keep stream open.
}
void OnStreamClosed() override {
std::cout << "Subscribe to IoT Core stream closed." << std::endl;
}
};
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
std::cout << "OnConnectCallback" << std::endl;
}
void OnDisconnectCallback(RpcError error) override {
std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl;
exit(-1);
}
bool OnErrorCallback(RpcError error) override {
std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl;
return true;
}
};
int main() {
String topic("test/topic/cpp");
QOS qos = QOS_AT_LEAST_ONCE;
int timeout = 10;
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
SubscribeToIoTCoreRequest request;
request.SetTopicName(topic);
request.SetQos(qos);
auto streamHandler = MakeShared<IoTCoreResponseHandler>(DefaultAllocator());
auto operation = ipcClient.NewSubscribeToIoTCore(streamHandler);
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (response) {
std::cout << "Successfully subscribed to topic: " << topic << std::endl;
} else {
// An error occurred.
std::cout << "Failed to subscribe to topic: " << topic << std::endl;
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
std::cout << "Operation error: " << error->GetMessage().value() << std::endl;
} else {
std::cout << "RPC error: " << response.GetRpcError() << std::endl;
}
exit(-1);
}
// Keep the main thread alive, or the process will exit.
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(10));
}
operation->Close();
return 0;
}