使用消息传递 API - AWS SimSpace Weaver

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用消息传递 API

消息传递 API 包含在SimSpace Weaver应用程序 SDK(最低版本 1.16.0)中。C++、Python 以及我们与虚幻引擎 5 和 Unity 的集成都支持消息传递。

有两个函数可以处理消息事务:SendMessageReceiveMessages。所有发送的消息都包含目的地和有效负载。ReceiveMessagesAPI 会返回应用程序入站消息队列中当前的消息列表。

C++

发送消息

AWS_WEAVERRUNTIME_API Result<void> SendMessage( Transaction& txn, const MessagePayload& payload, const MessageEndpoint& destination, MessageDeliveryType deliveryType = MessageDeliveryType::BestEffort ) noexcept;

接收消息

AWS_WEAVERRUNTIME_API Result<MessageList> ReceiveMessages( Transaction& txn) noexcept;
Python

发送消息

api.send_message( txn, # Transaction payload, # api.MessagePayload destination, # api.MessageDestination api.MessageDeliveryType.BestEffort # api.MessageDeliveryType )

接收消息

api.receive_messages( txn, # Transaction ) -> api.MessageList

发送消息

消息由交易(类似于其他 Weaver API 调用)、有效负载和目标组成。

消息负载

消息有效载荷是一种灵活的数据结构,最多 256 字节。我们建议将以下作为创建消息负载的最佳实践。

创建消息负载
  1. 创建定义消息内容的数据结构(例如 C++ struct 中的)。

  2. 创建包含要在消息中发送的值的消息负载。

  3. 创建MessagePayload对象。

消息目的地

消息的目的地由MessageEndpoint对象定义。这包括端点类型和终端节点 ID。目前唯一支持的端点类型是Partition,它允许您在模拟中将消息发送到其他分区。终端节点 ID 是目标目标的分区 ID。

您只能在一封邮件中提供 1 个目标地址。如果您想同时向多个分区发送消息,请创建并发送多条消息。

有关如何从某个位置解析消息端点的指导,请参阅处理消息传递时的提示

发送消息

您可以在创建目标和负载对象之后使用 SendMessage API。

C++
Api::SendMessage(transaction, payload, destination, MessageDeliveryType::BestEffort);
Python
api.send_message(txn, payload, destination, api.MessageDeliveryType.BestEffort)
发送消息的完整示例

以下示例演示了如何构造和发送通用消息。此示例发送了 16 条单独的消息。每条消息都包含一个值介于 0 和 15 之间的有效载荷以及当前的模拟滴答声。

C++
// Message struct definition struct MessageTickAndId { uint32_t id; uint32_t tick; }; Aws::WeaverRuntime::Result<void> SendMessages(Txn& txn) noexcept { // Fetch the destination MessageEndpoint with the endpoint resolver WEAVERRUNTIME_TRY( Api::MessageEndpoint destination, Api::Utils::MessageEndpointResolver::ResolveFromPosition( txn, "MySpatialSimulation", Api::Vector2F32 {231.3, 654.0} ) ); Log::Info("destination: ", destination); WEAVERRUNTIME_TRY(auto tick, Api::CurrentTick(txn)); uint16_t numSentMessages = 0; for (std::size_t i=0; i<16; i++) { // Create the message that'll be serialized into payload MessageTickAndId message {i, tick.value}; // Create the payload out of the struct const Api::MessagePayload& payload = Api::Utils::CreateMessagePayload( reinterpret_cast<const std::uint8_t*>(&message), sizeof(MessageTickAndId) ); // Send the payload to the destination Result<void> result = Api::SendMessage(txn, payload, destination); if (result.has_failure()) { // SendMessage has failure modes, log them auto error = result.as_failure().error(); std::cout<< "SendMessage failed, ErrorCode: " << error << std::endl; continue; } numSentMessages++; } std::cout << numSentMessages << " messages is sent to endpoint" << destination << std::endl; return Aws::WeaverRuntime::Success(); }
Python
# Message data class @dataclasses.dataclass class MessageTickAndId: tick: int = 0 id: int = 0 # send messages def _send_messages(self, txn): tick = api.current_tick(txn) num_messages_to_send = 16 # Fetch the destination MessageEndpoint with the endpoint resolver destination = api.utils.resolve_endpoint_from_domain_name_position( txn, "MySpatialSimulation", pos ) Log.debug("Destination_endpoint = %s", destination_endpoint) for id in range(num_messages_to_send): # Message struct that'll be serialized into payload message_tick_and_id = MessageTickAndId(id = id, tick = tick.value) # Create the payload out of the struct message_tick_and_id_data = struct.pack( '<ii', message_tick_and_id.id, message_tick_and_id.tick ) payload = api.MessagePayload(list(message_tick_and_id_data)) # Send the payload to the destination Log.debug("Sending message: %s, endpoint: %s", message_tick_and_id, destination ) api.send_message( txn, payload, destination, api.MessageDeliveryType.BestEffort ) Log.info("Sent %s messages to %s", num_messages_to_send, destination) return True

接收消息

SimSpace Weaver将消息传送到分区的入站消息队列中。使用 ReceiveMessages API 获取包含队列消息的MessageList对象。使用 ExtractMessage API 处理每条消息以获取消息数据。

C++
Result<void> ReceiveMessages(Txn& txn) noexcept { // Fetch all the messages sent to the partition owned by the app WEAVERRUNTIME_TRY(auto messages, Api::ReceiveMessages(txn)); std::cout << "Received" << messages.messages.size() << " messages" << std::endl; for (Api::Message& message : messages.messages) { std::cout << "Received message: " << message << std::endl; // Deserialize payload to the message struct const MessageTickAndId& receivedMessage = Api::Utils::ExtractMessage<MessageTickAndId>(message); std::cout << "Received MessageTickAndId, Id: " << receivedMessage.id <<", Tick: " << receivedMessage.tick << std::endl; } return Aws::WeaverRuntime::Success(); }
Python
# process incoming messages def _process_incoming_messages(self, txn): messages = api.receive_messages(txn) for message in messages: payload_list = message.payload.data payload_bytes = bytes(payload_list) message_tick_and_id_data_struct = MessageTickAndId(*struct.unpack('<ii', payload_bytes)) Log.debug("Received message. Header: %s, message: %s", message.header, message_tick_and_id_data_struct) Log.info("Received %s messages", len(messages)) return True

回复发件人

收到的每封邮件都包含一个邮件标头,其中包含有关邮件原始发件人的信息。你可以使用 message.header.source_endpoint 发送回复。

C++
Result<void> ReceiveMessages(Txn& txn) noexcept { // Fetch all the messages sent to the partition owned by the app WEAVERRUNTIME_TRY(auto messages, Api::ReceiveMessages(txn)); std::cout << "Received" << messages.messages.size() << " messages" << std::endl; for (Api::Message& message : messages.messages) { std::cout << "Received message: " << message << std::endl; // Deserialize payload to the message struct const MessageTickAndId& receivedMessage = Api::Utils::ExtractMessage<MessageTickAndId>(message); std::cout << "Received MessageTickAndId, Id: " << receivedMessage.id <<", Tick: " << receivedMessage.tick << std::endl; // Get the sender endpoint and payload to bounce the message back Api::MessageEndpoint& sender = message.header.source_endpoint; Api::MessagePayload& payload = message.payload; Api::SendMessage(txn, payload, sender); } return Aws::WeaverRuntime::Success(); }
Python
# process incoming messages def _process_incoming_messages(self, txn): messages = api.receive_messages(txn) for message in messages: payload_list = message.payload.data payload_bytes = bytes(payload_list) message_tick_and_id_data_struct = MessageTickAndId(*struct.unpack('<ii', payload_bytes)) Log.debug("Received message. Header: %s, message: %s", message.header, message_tick_and_id_data_struct) # Get the sender endpoint and payload # to bounce the message back sender = message.header.source_endpoint payload = payload_list api.send_message( txn, payload_list, sender, api.MessageDeliveryType.BestEffort Log.info("Received %s messages", len(messages)) return True