使用訊息傳遞 API - AWS SimSpace Weaver

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用訊息傳遞 API

消息傳遞 API 包含在SimSpace Weaver應用程序 SDK 中(最低版本 1.16.0)。消息傳遞支持 C ++,Python,並且我們與虛幻引擎 5 和統一的集成。

有兩個函數可以處理訊息交易:SendMessageReceiveMessages。所有發送的消息都包含目的地和有效負載。該 ReceiveMessages API 返回當前在應用程序的入站消息隊列中的消息列表。

C++

發送消息

AWS_WEAVERRUNTIME_API Result<void> SendMessage( Transaction& txn, const MessagePayload& payload, const MessageEndpoint& destination, MessageDeliveryType deliveryType = MessageDeliveryType::BestEffort ) noexcept;

接收訊息

AWS_WEAVERRUNTIME_API Result<MessageList> ReceiveMessages( Transaction& txn) noexcept;
Python

發送消息

api.send_message( txn, # Transaction payload, # api.MessagePayload destination, # api.MessageDestination api.MessageDeliveryType.BestEffort # api.MessageDeliveryType )

接收訊息

api.receive_messages( txn, # Transaction ) -> api.MessageList

傳送訊息

訊息是由交易 (類似於其他 Weaver API 呼叫)、承載和目的地所組成。

訊息承載

訊息承載是高達 256 個位元組的彈性資料結構。我們建議您採用下列作為建立訊息承載的最佳作法。

若要建立訊息承載
  1. 建立定義訊息內容的資料結構 (例如 C++)。struct

  2. 建立包含要在郵件中傳送之值的訊息承載。

  3. 建立MessagePayload物件。

訊息目的地

訊息的目的地由MessageEndpoint物件定義。這包括端點類型和端點 ID。目前支援的唯一端點類型為Partition,可讓您將訊息傳送至模擬中的其他分割區。端點 ID 是目標目標的分割區 ID。

您只能在一封郵件中提供 1 個目的地位址。如果您想同時將消息發送到 1 個以上的分區,請創建並發送多個消息。

如需如何從位置解析訊息端點的指引,請參閱使用消息傳遞時的提示

傳送訊息

您可以在建立目的地和承載物件之後使用 SendMessage API。

C++
Api::SendMessage(transaction, payload, destination, MessageDeliveryType::BestEffort);
Python
api.send_message(txn, payload, destination, api.MessageDeliveryType.BestEffort)
發送消息的完整示例

下面的例子演示了如何構建和發送一般消息。此範例會傳送 16 則個別訊息。每個訊息都包含一個值為 0 和 15 的有效負載,以及目前的模擬刻度。

C++
// Message struct definition struct MessageTickAndId { uint32_t id; uint32_t tick; }; Aws::WeaverRuntime::Result<void> SendMessages(Txn& txn) noexcept { // Fetch the destination MessageEndpoint with the endpoint resolver WEAVERRUNTIME_TRY( Api::MessageEndpoint destination, Api::Utils::MessageEndpointResolver::ResolveFromPosition( txn, "MySpatialSimulation", Api::Vector2F32 {231.3, 654.0} ) ); Log::Info("destination: ", destination); WEAVERRUNTIME_TRY(auto tick, Api::CurrentTick(txn)); uint16_t numSentMessages = 0; for (std::size_t i=0; i<16; i++) { // Create the message that'll be serialized into payload MessageTickAndId message {i, tick.value}; // Create the payload out of the struct const Api::MessagePayload& payload = Api::Utils::CreateMessagePayload( reinterpret_cast<const std::uint8_t*>(&message), sizeof(MessageTickAndId) ); // Send the payload to the destination Result<void> result = Api::SendMessage(txn, payload, destination); if (result.has_failure()) { // SendMessage has failure modes, log them auto error = result.as_failure().error(); std::cout<< "SendMessage failed, ErrorCode: " << error << std::endl; continue; } numSentMessages++; } std::cout << numSentMessages << " messages is sent to endpoint" << destination << std::endl; return Aws::WeaverRuntime::Success(); }
Python
# Message data class @dataclasses.dataclass class MessageTickAndId: tick: int = 0 id: int = 0 # send messages def _send_messages(self, txn): tick = api.current_tick(txn) num_messages_to_send = 16 # Fetch the destination MessageEndpoint with the endpoint resolver destination = api.utils.resolve_endpoint_from_domain_name_position( txn, "MySpatialSimulation", pos ) Log.debug("Destination_endpoint = %s", destination_endpoint) for id in range(num_messages_to_send): # Message struct that'll be serialized into payload message_tick_and_id = MessageTickAndId(id = id, tick = tick.value) # Create the payload out of the struct message_tick_and_id_data = struct.pack( '<ii', message_tick_and_id.id, message_tick_and_id.tick ) payload = api.MessagePayload(list(message_tick_and_id_data)) # Send the payload to the destination Log.debug("Sending message: %s, endpoint: %s", message_tick_and_id, destination ) api.send_message( txn, payload, destination, api.MessageDeliveryType.BestEffort ) Log.info("Sent %s messages to %s", num_messages_to_send, destination) return True

接收訊息

SimSpace Weaver將訊息傳遞至分割區的輸入訊息佇列。使用 ReceiveMessages API 取得包含佇列訊息的MessageList物件。使用 ExtractMessage API 處理每條消息以獲取消息數據。

C++
Result<void> ReceiveMessages(Txn& txn) noexcept { // Fetch all the messages sent to the partition owned by the app WEAVERRUNTIME_TRY(auto messages, Api::ReceiveMessages(txn)); std::cout << "Received" << messages.messages.size() << " messages" << std::endl; for (Api::Message& message : messages.messages) { std::cout << "Received message: " << message << std::endl; // Deserialize payload to the message struct const MessageTickAndId& receivedMessage = Api::Utils::ExtractMessage<MessageTickAndId>(message); std::cout << "Received MessageTickAndId, Id: " << receivedMessage.id <<", Tick: " << receivedMessage.tick << std::endl; } return Aws::WeaverRuntime::Success(); }
Python
# process incoming messages def _process_incoming_messages(self, txn): messages = api.receive_messages(txn) for message in messages: payload_list = message.payload.data payload_bytes = bytes(payload_list) message_tick_and_id_data_struct = MessageTickAndId(*struct.unpack('<ii', payload_bytes)) Log.debug("Received message. Header: %s, message: %s", message.header, message_tick_and_id_data_struct) Log.info("Received %s messages", len(messages)) return True

回覆寄件者

每個收到的郵件都包含一個郵件標頭,其中包含郵件原始寄件者的相關資訊 您可以使用消息。頭。源端點發送回复。

C++
Result<void> ReceiveMessages(Txn& txn) noexcept { // Fetch all the messages sent to the partition owned by the app WEAVERRUNTIME_TRY(auto messages, Api::ReceiveMessages(txn)); std::cout << "Received" << messages.messages.size() << " messages" << std::endl; for (Api::Message& message : messages.messages) { std::cout << "Received message: " << message << std::endl; // Deserialize payload to the message struct const MessageTickAndId& receivedMessage = Api::Utils::ExtractMessage<MessageTickAndId>(message); std::cout << "Received MessageTickAndId, Id: " << receivedMessage.id <<", Tick: " << receivedMessage.tick << std::endl; // Get the sender endpoint and payload to bounce the message back Api::MessageEndpoint& sender = message.header.source_endpoint; Api::MessagePayload& payload = message.payload; Api::SendMessage(txn, payload, sender); } return Aws::WeaverRuntime::Success(); }
Python
# process incoming messages def _process_incoming_messages(self, txn): messages = api.receive_messages(txn) for message in messages: payload_list = message.payload.data payload_bytes = bytes(payload_list) message_tick_and_id_data_struct = MessageTickAndId(*struct.unpack('<ii', payload_bytes)) Log.debug("Received message. Header: %s, message: %s", message.header, message_tick_and_id_data_struct) # Get the sender endpoint and payload # to bounce the message back sender = message.header.source_endpoint payload = payload_list api.send_message( txn, payload_list, sender, api.MessageDeliveryType.BestEffort Log.info("Received %s messages", len(messages)) return True