FIFO 主題的代碼範例 - Amazon Simple Notification Service

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

FIFO 主題的代碼範例

您可以使用下列代碼範例將使用 Amazon SNS FIFO 主題的汽車零件價格管理範例使用案例與 Amazon SQS FIFO 佇列或標準佇列整合。

使用 AWS 開發套件

使用 AWS 開發套件,您可以透過設定將其 FifoTopic 屬性設定為 true 建立一個 Amazon SNS FIFO 主題。您可以通過將其 FifoQueue 屬性設定為 true 建立一個 Amazon SQS FIFO 佇列。此外,您必須將 .fifo 後綴新增到每個 FIFO 資源的名稱。建立 FIFO 主題或佇列之後,您無法將其轉換為標準主題或佇列。

下列代碼範例會建立這些 FIFO 與標準佇列資源:

  • 分配價格更新的 Amazon SNS FIFO 主題

  • Amazon SQS FIFO 佇列提供這些更新給批發和零售應用程式

  • 用於儲存記錄的分析應用程式的 Amazon SQS 標準佇列,可查詢商業智慧 (BI)

  • 將三個佇列連線到主題的 Amazon SNS FIFO 訂閱

此範例設定訂閱的篩選政策。如果您藉由將訊息發佈到主題來測試範例,請確定發佈具有 business 屬性的訊息。指定 retailwholesale 做為屬性值。否則,會篩選出訊息,而不會傳遞至訂閱佇列。如需更多詳細資訊,請參閱 FIFO 主題的訊息篩選

Java
適用於 Java 2.x 的開發套件
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

此範例

  • 建立一個 Amazon SNS FIFO 主題、兩個 Amazon SQS FIFO 佇列和一個標準佇列。

  • 訂閱佇列到主題並向該主題發布訊息。

測試可驗證每個佇列的訊息接收狀況。完整範例也會顯示存取政策的新增,並在最後刪除資源。

public class PriceUpdateExample { public final static SnsClient snsClient = SnsClient.create(); public final static SqsClient sqsClient = SqsClient.create(); public static void main(String[] args) { final String usage = "\n" + "Usage: " + " <topicName> <wholesaleQueueFifoName> <retailQueueFifoName> <analyticsQueueName>\n\n" + "Where:\n" + " fifoTopicName - The name of the FIFO topic that you want to create. \n\n" + " wholesaleQueueARN - The name of a SQS FIFO queue that will be created for the wholesale consumer. \n\n" + " retailQueueARN - The name of a SQS FIFO queue that will created for the retail consumer. \n\n" + " analyticsQueueARN - The name of a SQS standard queue that will be created for the analytics consumer. \n\n"; if (args.length != 4) { System.out.println(usage); System.exit(1); } final String fifoTopicName = args[0]; final String wholeSaleQueueName = args[1]; final String retailQueueName = args[2]; final String analyticsQueueName = args[3]; // For convenience, the QueueData class holds metadata about a queue: ARN, URL, // name and type. List<QueueData> queues = List.of( new QueueData(wholeSaleQueueName, QueueType.FIFO), new QueueData(retailQueueName, QueueType.FIFO), new QueueData(analyticsQueueName, QueueType.Standard)); // Create queues. createQueues(queues); // Create a topic. String topicARN = createFIFOTopic(fifoTopicName); // Subscribe each queue to the topic. subscribeQueues(queues, topicARN); // Allow the newly created topic to send messages to the queues. addAccessPolicyToQueuesFINAL(queues, topicARN); // Publish a sample price update message with payload. publishPriceUpdate(topicARN, "{\"product\": 214, \"price\": 79.99}", "Consumables"); // Clean up resources. deleteSubscriptions(queues); deleteQueues(queues); deleteTopic(topicARN); } public static String createFIFOTopic(String topicName) { try { // Create a FIFO topic by using the SNS service client. Map<String, String> topicAttributes = Map.of( "FifoTopic", "true", "ContentBasedDeduplication", "false"); CreateTopicRequest topicRequest = CreateTopicRequest.builder() .name(topicName) .attributes(topicAttributes) .build(); CreateTopicResponse response = snsClient.createTopic(topicRequest); String topicArn = response.topicArn(); System.out.println("The topic ARN is" + topicArn); return topicArn; } catch (SnsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } return ""; } public static void subscribeQueues(List<QueueData> queues, String topicARN) { queues.forEach(queue -> { SubscribeRequest subscribeRequest = SubscribeRequest.builder() .topicArn(topicARN) .endpoint(queue.queueARN) .protocol("sqs") .build(); // Subscribe to the endpoint by using the SNS service client. // Only Amazon SQS queues can receive notifications from an Amazon SNS FIFO // topic. SubscribeResponse subscribeResponse = snsClient.subscribe(subscribeRequest); System.out.println("The queue [" + queue.queueARN + "] subscribed to the topic [" + topicARN + "]"); queue.subscriptionARN = subscribeResponse.subscriptionArn(); }); } public static void publishPriceUpdate(String topicArn, String payload, String groupId) { try { // Create and publish a message that updates the wholesale price. String subject = "Price Update"; String dedupId = UUID.randomUUID().toString(); String attributeName = "business"; String attributeValue = "wholesale"; MessageAttributeValue msgAttValue = MessageAttributeValue.builder() .dataType("String") .stringValue(attributeValue) .build(); Map<String, MessageAttributeValue> attributes = new HashMap<>(); attributes.put(attributeName, msgAttValue); PublishRequest pubRequest = PublishRequest.builder() .topicArn(topicArn) .subject(subject) .message(payload) .messageGroupId(groupId) .messageDeduplicationId(dedupId) .messageAttributes(attributes) .build(); final PublishResponse response = snsClient.publish(pubRequest); System.out.println(response.messageId()); System.out.println(response.sequenceNumber()); System.out.println("Message was published to " + topicArn); } catch (SnsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } }
Python
適用於 Python (Boto3) 的 SDK
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

建立 Amazon SNS FIFO 主題,訂閱 Amazon SQS FIFO 和標準佇列到該主題,並向該主題發布訊息。

def usage_demo(): """Shows how to subscribe queues to a FIFO topic.""" print("-" * 88) print("Welcome to the `Subscribe queues to a FIFO topic` demo!") print("-" * 88) sns = boto3.resource("sns") sqs = boto3.resource("sqs") fifo_topic_wrapper = FifoTopicWrapper(sns) sns_wrapper = SnsWrapper(sns) prefix = "sqs-subscribe-demo-" queues = set() subscriptions = set() wholesale_queue = sqs.create_queue( QueueName=prefix + "wholesale.fifo", Attributes={ "MaximumMessageSize": str(4096), "ReceiveMessageWaitTimeSeconds": str(10), "VisibilityTimeout": str(300), "FifoQueue": str(True), "ContentBasedDeduplication": str(True), }, ) queues.add(wholesale_queue) print(f"Created FIFO queue with URL: {wholesale_queue.url}.") retail_queue = sqs.create_queue( QueueName=prefix + "retail.fifo", Attributes={ "MaximumMessageSize": str(4096), "ReceiveMessageWaitTimeSeconds": str(10), "VisibilityTimeout": str(300), "FifoQueue": str(True), "ContentBasedDeduplication": str(True), }, ) queues.add(retail_queue) print(f"Created FIFO queue with URL: {retail_queue.url}.") analytics_queue = sqs.create_queue(QueueName=prefix + "analytics", Attributes={}) queues.add(analytics_queue) print(f"Created standard queue with URL: {analytics_queue.url}.") topic = fifo_topic_wrapper.create_fifo_topic("price-updates-topic.fifo") print(f"Created FIFO topic: {topic.attributes['TopicArn']}.") for q in queues: fifo_topic_wrapper.add_access_policy(q, topic.attributes["TopicArn"]) print(f"Added access policies for topic: {topic.attributes['TopicArn']}.") for q in queues: sub = fifo_topic_wrapper.subscribe_queue_to_topic( topic, q.attributes["QueueArn"] ) subscriptions.add(sub) print(f"Subscribed queues to topic: {topic.attributes['TopicArn']}.") input("Press Enter to publish a message to the topic.") message_id = fifo_topic_wrapper.publish_price_update( topic, '{"product": 214, "price": 79.99}', "Consumables" ) print(f"Published price update with message ID: {message_id}.") # Clean up the subscriptions, queues, and topic. input("Press Enter to clean up resources.") for s in subscriptions: sns_wrapper.delete_subscription(s) sns_wrapper.delete_topic(topic) for q in queues: fifo_topic_wrapper.delete_queue(q) print(f"Deleted subscriptions, queues, and topic.") print("Thanks for watching!") print("-" * 88) class FifoTopicWrapper: """Encapsulates Amazon SNS FIFO topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource def create_fifo_topic(self, topic_name): """ Create a FIFO topic. Topic names must be made up of only uppercase and lowercase ASCII letters, numbers, underscores, and hyphens, and must be between 1 and 256 characters long. For a FIFO topic, the name must end with the .fifo suffix. :param topic_name: The name for the topic. :return: The new topic. """ try: topic = self.sns_resource.create_topic( Name=topic_name, Attributes={ "FifoTopic": str(True), "ContentBasedDeduplication": str(False), }, ) logger.info("Created FIFO topic with name=%s.", topic_name) return topic except ClientError as error: logger.exception("Couldn't create topic with name=%s!", topic_name) raise error @staticmethod def add_access_policy(queue, topic_arn): """ Add the necessary access policy to a queue, so it can receive messages from a topic. :param queue: The queue resource. :param topic_arn: The ARN of the topic. :return: None. """ try: queue.set_attributes( Attributes={ "Policy": json.dumps( { "Version": "2012-10-17", "Statement": [ { "Sid": "test-sid", "Effect": "Allow", "Principal": {"AWS": "*"}, "Action": "SQS:SendMessage", "Resource": queue.attributes["QueueArn"], "Condition": { "ArnLike": {"aws:SourceArn": topic_arn} }, } ], } ) } ) logger.info("Added trust policy to the queue.") except ClientError as error: logger.exception("Couldn't add trust policy to the queue!") raise error @staticmethod def subscribe_queue_to_topic(topic, queue_arn): """ Subscribe a queue to a topic. :param topic: The topic resource. :param queue_arn: The ARN of the queue. :return: The subscription resource. """ try: subscription = topic.subscribe( Protocol="sqs", Endpoint=queue_arn, ) logger.info("The queue is subscribed to the topic.") return subscription except ClientError as error: logger.exception("Couldn't subscribe queue to topic!") raise error @staticmethod def publish_price_update(topic, payload, group_id): """ Compose and publish a message that updates the wholesale price. :param topic: The topic to publish to. :param payload: The message to publish. :param group_id: The group ID for the message. :return: The ID of the message. """ try: att_dict = {"business": {"DataType": "String", "StringValue": "wholesale"}} dedup_id = uuid.uuid4() response = topic.publish( Subject="Price Update", Message=payload, MessageAttributes=att_dict, MessageGroupId=group_id, MessageDeduplicationId=str(dedup_id), ) message_id = response["MessageId"] logger.info("Published message to topic %s.", topic.arn) except ClientError as error: logger.exception("Couldn't publish message to topic %s.", topic.arn) raise error return message_id @staticmethod def delete_queue(queue): """ Removes an SQS queue. When run against an AWS account, it can take up to 60 seconds before the queue is actually deleted. :param queue: The queue to delete. :return: None """ try: queue.delete() logger.info("Deleted queue with URL=%s.", queue.url) except ClientError as error: logger.exception("Couldn't delete queue with URL=%s!", queue.url) raise error
  • 如需 API 的詳細資訊,請參閱《適用於 Python (Boto3) 的 AWS SDK API 參考資料》中的下列主題。

SAP ABAP
適用於 SAP ABAP 的開發套件
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

建立 FIFO 主題、將 Amazon SQS FIFO 佇列訂閱至主題,然後將訊息發佈至 Amazon SNS 主題。

" Creates a FIFO topic. " DATA lt_tpc_attributes TYPE /aws1/cl_snstopicattrsmap_w=>tt_topicattributesmap. DATA ls_tpc_attributes TYPE /aws1/cl_snstopicattrsmap_w=>ts_topicattributesmap_maprow. ls_tpc_attributes-key = 'FifoTopic'. ls_tpc_attributes-value = NEW /aws1/cl_snstopicattrsmap_w( iv_value = 'true' ). INSERT ls_tpc_attributes INTO TABLE lt_tpc_attributes. TRY. DATA(lo_create_result) = lo_sns->createtopic( iv_name = iv_topic_name it_attributes = lt_tpc_attributes ). DATA(lv_topic_arn) = lo_create_result->get_topicarn( ). ov_topic_arn = lv_topic_arn. " ov_topic_arn is returned for testing purposes. " MESSAGE 'FIFO topic created' TYPE 'I'. CATCH /aws1/cx_snstopiclimitexcdex. MESSAGE 'Unable to create more topics. You have reached the maximum number of topics allowed.' TYPE 'E'. ENDTRY. " Subscribes an endpoint to an Amazon Simple Notification Service (Amazon SNS) topic. " " Only Amazon Simple Queue Service (Amazon SQS) FIFO queues can be subscribed to an SNS FIFO topic. " TRY. DATA(lo_subscribe_result) = lo_sns->subscribe( iv_topicarn = lv_topic_arn iv_protocol = 'sqs' iv_endpoint = iv_queue_arn ). DATA(lv_subscription_arn) = lo_subscribe_result->get_subscriptionarn( ). ov_subscription_arn = lv_subscription_arn. " ov_subscription_arn is returned for testing purposes. " MESSAGE 'SQS queue was subscribed to SNS topic.' TYPE 'I'. CATCH /aws1/cx_snsnotfoundexception. MESSAGE 'Topic does not exist.' TYPE 'E'. CATCH /aws1/cx_snssubscriptionlmte00. MESSAGE 'Unable to create subscriptions. You have reached the maximum number of subscriptions allowed.' TYPE 'E'. ENDTRY. " Publish message to SNS topic. " TRY. DATA lt_msg_attributes TYPE /aws1/cl_snsmessageattrvalue=>tt_messageattributemap. DATA ls_msg_attributes TYPE /aws1/cl_snsmessageattrvalue=>ts_messageattributemap_maprow. ls_msg_attributes-key = 'Importance'. ls_msg_attributes-value = NEW /aws1/cl_snsmessageattrvalue( iv_datatype = 'String' iv_stringvalue = 'High' ). INSERT ls_msg_attributes INTO TABLE lt_msg_attributes. DATA(lo_result) = lo_sns->publish( iv_topicarn = lv_topic_arn iv_message = 'The price of your mobile plan has been increased from $19 to $23' iv_subject = 'Changes to mobile plan' iv_messagegroupid = 'Update-2' iv_messagededuplicationid = 'Update-2.1' it_messageattributes = lt_msg_attributes ). ov_message_id = lo_result->get_messageid( ). " ov_message_id is returned for testing purposes. " MESSAGE 'Message was published to SNS topic.' TYPE 'I'. CATCH /aws1/cx_snsnotfoundexception. MESSAGE 'Topic does not exist.' TYPE 'E'. ENDTRY.
  • 如需 API 詳細資訊,請參閱《適用於 SAP ABAP 的 AWS SDK API 參考》中的下列主題。

接收來自 FIFO 訂閱的訊息

您現在可以在三個已訂閱的應用程式中接收價格更新。如 FIFO 主題範例使用案例 所示,每個消費者應用程式的入口點是 Amazon SQS 佇列,其對應的 AWS Lambda 函數可以自動輪詢。當 Amazon SQS 佇列是 Lambda 函數的事件來源時,Lambda 會視需要調整其輪詢器機群,以有效率地使用訊息。

如需更多詳細資訊,請參閱 AWS Lambda 開發人員指南中的將 AWS Lambda 與 Amazon SQS 搭配使用。如需撰寫自己佇列輪詢程式的相關資訊,請參閱 Amazon Simple Queue Service 開發者指南中的Amazon SQS 標準和 FIFO 佇列的建議Amazon Simple Queue Service API 參考中的 ReceiveMessage

使用 AWS CloudFormation

AWS CloudFormation 可讓您使用範本檔案,並將一系列的 AWS 資源一起建立和設定為單一單位。本節含有可建立下列項目的範例範本:

  • 分配價格更新的 Amazon SNS FIFO 主題

  • Amazon SQS FIFO 佇列提供這些更新給批發和零售應用程式

  • 用於儲存記錄的分析應用程式的 Amazon SQS 標準佇列,可查詢商業智慧 (BI)

  • 將三個佇列連線到主題的 Amazon SNS FIFO 訂閱

  • 指定訂閱者應用程式只接收所需價格更新的篩選政策

注意

如果您藉由將訊息發佈至主題來測試此程式碼範例,請確定發佈具有 business 屬性的訊息。指定 retailwholesale 做為屬性值。否則,會篩選出訊息,而不會傳遞至訂閱佇列。

{ "AWSTemplateFormatVersion": "2010-09-09", "Resources": { "PriceUpdatesTopic": { "Type": "AWS::SNS::Topic", "Properties": { "TopicName": "PriceUpdatesTopic.fifo", "FifoTopic": true, "ContentBasedDeduplication": false, "ArchivePolicy": { "MessageRetentionPeriod": "30" } } }, "WholesaleQueue": { "Type": "AWS::SQS::Queue", "Properties": { "QueueName": "WholesaleQueue.fifo", "FifoQueue": true, "ContentBasedDeduplication": false } }, "RetailQueue": { "Type": "AWS::SQS::Queue", "Properties": { "QueueName": "RetailQueue.fifo", "FifoQueue": true, "ContentBasedDeduplication": false } }, "AnalyticsQueue": { "Type": "AWS::SQS::Queue", "Properties": { "QueueName": "AnalyticsQueue" } }, "WholesaleSubscription": { "Type": "AWS::SNS::Subscription", "Properties": { "TopicArn": { "Ref": "PriceUpdatesTopic" }, "Endpoint": { "Fn::GetAtt": [ "WholesaleQueue", "Arn" ] }, "Protocol": "sqs", "RawMessageDelivery": "false", "FilterPolicyScope": "MessageBody", "FilterPolicy": { "business": [ "wholesale" ] } } }, "RetailSubscription": { "Type": "AWS::SNS::Subscription", "Properties": { "TopicArn": { "Ref": "PriceUpdatesTopic" }, "Endpoint": { "Fn::GetAtt": [ "RetailQueue", "Arn" ] }, "Protocol": "sqs", "RawMessageDelivery": "false", "FilterPolicyScope": "MessageBody", "FilterPolicy": { "business": [ "retail" ] } } }, "AnalyticsSubscription": { "Type": "AWS::SNS::Subscription", "Properties": { "TopicArn": { "Ref": "PriceUpdatesTopic" }, "Endpoint": { "Fn::GetAtt": [ "AnalyticsQueue", "Arn" ] }, "Protocol": "sqs", "RawMessageDelivery": "false" } }, "SalesQueuesPolicy": { "Type": "AWS::SQS::QueuePolicy", "Properties": { "PolicyDocument": { "Statement": [ { "Effect": "Allow", "Principal": { "Service": "sns.amazonaws.com" }, "Action": [ "sqs:SendMessage" ], "Resource": "*", "Condition": { "ArnEquals": { "aws:SourceArn": { "Ref": "PriceUpdatesTopic" } } } } ] }, "Queues": [ { "Ref": "WholesaleQueue" }, { "Ref": "RetailQueue" }, { "Ref": "AnalyticsQueue" } ] } } } }

如需有關使用 AWS CloudFormation 範本部署 AWS 資源的詳細資訊,請參閱 AWS CloudFormation 使用者指南中的入門