本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
FIFO 主題的 Amazon SNS程式碼範例
您可以使用下列程式碼範例,使用 Amazon SNSFIFO主題與 Amazon SQSFIFO佇列或標準佇列整合自動零件價格管理範例使用案例。
使用 AWS SDK
使用 AWS SDK,您可以將 Amazon 主題的FifoTopic
屬性設定為 來建立 Amazon SNSFIFO主題true
。您可以將 Amazon SQSFIFO佇列的FifoQueue
屬性設定為 來建立 Amazon 佇列true
。此外,您必須將.fifo
尾碼新增至每個FIFO資源的名稱。建立FIFO主題或佇列後,您無法將其轉換為標準主題或佇列。
下列程式碼範例會建立這些 FIFO 和標準佇列資源:
-
分發價格更新的 Amazon SNSFIFO主題
-
為批發和零售應用程式提供這些更新的 Amazon SQSFIFO佇列
-
存放記錄之分析應用程式的 Amazon SQS標準佇列,可查詢商業智慧 (BI)
-
將三個佇列連接至主題的 Amazon SNSFIFO訂閱
此範例設定訂閱的篩選政策。如果您藉由將訊息發佈到主題來測試範例,請確定發佈具有 business
屬性的訊息。指定 retail
或 wholesale
做為屬性值。否則,會篩選出訊息,而不會傳遞至訂閱佇列。如需詳細資訊,請參閱FIFO 主題的 Amazon SNS 訊息篩選。
- Java
-
- SDK 適用於 Java 2.x
-
此範例
測試可驗證每個佇列的訊息接收狀況。完整範例也會顯示存取政策的新增,並在最後刪除資源。
public class PriceUpdateExample {
public final static SnsClient snsClient = SnsClient.create();
public final static SqsClient sqsClient = SqsClient.create();
public static void main(String[] args) {
final String usage = "\n" +
"Usage: " +
" <topicName> <wholesaleQueueFifoName> <retailQueueFifoName> <analyticsQueueName>\n\n" +
"Where:\n" +
" fifoTopicName - The name of the FIFO topic that you want to create. \n\n" +
" wholesaleQueueARN - The name of a SQS FIFO queue that will be created for the wholesale consumer. \n\n"
+
" retailQueueARN - The name of a SQS FIFO queue that will created for the retail consumer. \n\n" +
" analyticsQueueARN - The name of a SQS standard queue that will be created for the analytics consumer. \n\n";
if (args.length != 4) {
System.out.println(usage);
System.exit(1);
}
final String fifoTopicName = args[0];
final String wholeSaleQueueName = args[1];
final String retailQueueName = args[2];
final String analyticsQueueName = args[3];
// For convenience, the QueueData class holds metadata about a queue: ARN, URL,
// name and type.
List<QueueData> queues = List.of(
new QueueData(wholeSaleQueueName, QueueType.FIFO),
new QueueData(retailQueueName, QueueType.FIFO),
new QueueData(analyticsQueueName, QueueType.Standard));
// Create queues.
createQueues(queues);
// Create a topic.
String topicARN = createFIFOTopic(fifoTopicName);
// Subscribe each queue to the topic.
subscribeQueues(queues, topicARN);
// Allow the newly created topic to send messages to the queues.
addAccessPolicyToQueuesFINAL(queues, topicARN);
// Publish a sample price update message with payload.
publishPriceUpdate(topicARN, "{\"product\": 214, \"price\": 79.99}", "Consumables");
// Clean up resources.
deleteSubscriptions(queues);
deleteQueues(queues);
deleteTopic(topicARN);
}
public static String createFIFOTopic(String topicName) {
try {
// Create a FIFO topic by using the SNS service client.
Map<String, String> topicAttributes = Map.of(
"FifoTopic", "true",
"ContentBasedDeduplication", "false");
CreateTopicRequest topicRequest = CreateTopicRequest.builder()
.name(topicName)
.attributes(topicAttributes)
.build();
CreateTopicResponse response = snsClient.createTopic(topicRequest);
String topicArn = response.topicArn();
System.out.println("The topic ARN is" + topicArn);
return topicArn;
} catch (SnsException e) {
System.err.println(e.awsErrorDetails().errorMessage());
System.exit(1);
}
return "";
}
public static void subscribeQueues(List<QueueData> queues, String topicARN) {
queues.forEach(queue -> {
SubscribeRequest subscribeRequest = SubscribeRequest.builder()
.topicArn(topicARN)
.endpoint(queue.queueARN)
.protocol("sqs")
.build();
// Subscribe to the endpoint by using the SNS service client.
// Only Amazon SQS queues can receive notifications from an Amazon SNS FIFO
// topic.
SubscribeResponse subscribeResponse = snsClient.subscribe(subscribeRequest);
System.out.println("The queue [" + queue.queueARN + "] subscribed to the topic [" + topicARN + "]");
queue.subscriptionARN = subscribeResponse.subscriptionArn();
});
}
public static void publishPriceUpdate(String topicArn, String payload, String groupId) {
try {
// Create and publish a message that updates the wholesale price.
String subject = "Price Update";
String dedupId = UUID.randomUUID().toString();
String attributeName = "business";
String attributeValue = "wholesale";
MessageAttributeValue msgAttValue = MessageAttributeValue.builder()
.dataType("String")
.stringValue(attributeValue)
.build();
Map<String, MessageAttributeValue> attributes = new HashMap<>();
attributes.put(attributeName, msgAttValue);
PublishRequest pubRequest = PublishRequest.builder()
.topicArn(topicArn)
.subject(subject)
.message(payload)
.messageGroupId(groupId)
.messageDeduplicationId(dedupId)
.messageAttributes(attributes)
.build();
final PublishResponse response = snsClient.publish(pubRequest);
System.out.println(response.messageId());
System.out.println(response.sequenceNumber());
System.out.println("Message was published to " + topicArn);
} catch (SnsException e) {
System.err.println(e.awsErrorDetails().errorMessage());
System.exit(1);
}
}
- Python
-
- SDK for Python (Boto3)
-
建立 Amazon SNSFIFO主題、訂閱 Amazon SQSFIFO和標準佇列至主題,以及發佈訊息至主題。
def usage_demo():
"""Shows how to subscribe queues to a FIFO topic."""
print("-" * 88)
print("Welcome to the `Subscribe queues to a FIFO topic` demo!")
print("-" * 88)
sns = boto3.resource("sns")
sqs = boto3.resource("sqs")
fifo_topic_wrapper = FifoTopicWrapper(sns)
sns_wrapper = SnsWrapper(sns)
prefix = "sqs-subscribe-demo-"
queues = set()
subscriptions = set()
wholesale_queue = sqs.create_queue(
QueueName=prefix + "wholesale.fifo",
Attributes={
"MaximumMessageSize": str(4096),
"ReceiveMessageWaitTimeSeconds": str(10),
"VisibilityTimeout": str(300),
"FifoQueue": str(True),
"ContentBasedDeduplication": str(True),
},
)
queues.add(wholesale_queue)
print(f"Created FIFO queue with URL: {wholesale_queue.url}.")
retail_queue = sqs.create_queue(
QueueName=prefix + "retail.fifo",
Attributes={
"MaximumMessageSize": str(4096),
"ReceiveMessageWaitTimeSeconds": str(10),
"VisibilityTimeout": str(300),
"FifoQueue": str(True),
"ContentBasedDeduplication": str(True),
},
)
queues.add(retail_queue)
print(f"Created FIFO queue with URL: {retail_queue.url}.")
analytics_queue = sqs.create_queue(QueueName=prefix + "analytics", Attributes={})
queues.add(analytics_queue)
print(f"Created standard queue with URL: {analytics_queue.url}.")
topic = fifo_topic_wrapper.create_fifo_topic("price-updates-topic.fifo")
print(f"Created FIFO topic: {topic.attributes['TopicArn']}.")
for q in queues:
fifo_topic_wrapper.add_access_policy(q, topic.attributes["TopicArn"])
print(f"Added access policies for topic: {topic.attributes['TopicArn']}.")
for q in queues:
sub = fifo_topic_wrapper.subscribe_queue_to_topic(
topic, q.attributes["QueueArn"]
)
subscriptions.add(sub)
print(f"Subscribed queues to topic: {topic.attributes['TopicArn']}.")
input("Press Enter to publish a message to the topic.")
message_id = fifo_topic_wrapper.publish_price_update(
topic, '{"product": 214, "price": 79.99}', "Consumables"
)
print(f"Published price update with message ID: {message_id}.")
# Clean up the subscriptions, queues, and topic.
input("Press Enter to clean up resources.")
for s in subscriptions:
sns_wrapper.delete_subscription(s)
sns_wrapper.delete_topic(topic)
for q in queues:
fifo_topic_wrapper.delete_queue(q)
print(f"Deleted subscriptions, queues, and topic.")
print("Thanks for watching!")
print("-" * 88)
class FifoTopicWrapper:
"""Encapsulates Amazon SNS FIFO topic and subscription functions."""
def __init__(self, sns_resource):
"""
:param sns_resource: A Boto3 Amazon SNS resource.
"""
self.sns_resource = sns_resource
def create_fifo_topic(self, topic_name):
"""
Create a FIFO topic.
Topic names must be made up of only uppercase and lowercase ASCII letters,
numbers, underscores, and hyphens, and must be between 1 and 256 characters long.
For a FIFO topic, the name must end with the .fifo suffix.
:param topic_name: The name for the topic.
:return: The new topic.
"""
try:
topic = self.sns_resource.create_topic(
Name=topic_name,
Attributes={
"FifoTopic": str(True),
"ContentBasedDeduplication": str(False),
},
)
logger.info("Created FIFO topic with name=%s.", topic_name)
return topic
except ClientError as error:
logger.exception("Couldn't create topic with name=%s!", topic_name)
raise error
@staticmethod
def add_access_policy(queue, topic_arn):
"""
Add the necessary access policy to a queue, so
it can receive messages from a topic.
:param queue: The queue resource.
:param topic_arn: The ARN of the topic.
:return: None.
"""
try:
queue.set_attributes(
Attributes={
"Policy": json.dumps(
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "test-sid",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "SQS:SendMessage",
"Resource": queue.attributes["QueueArn"],
"Condition": {
"ArnLike": {"aws:SourceArn": topic_arn}
},
}
],
}
)
}
)
logger.info("Added trust policy to the queue.")
except ClientError as error:
logger.exception("Couldn't add trust policy to the queue!")
raise error
@staticmethod
def subscribe_queue_to_topic(topic, queue_arn):
"""
Subscribe a queue to a topic.
:param topic: The topic resource.
:param queue_arn: The ARN of the queue.
:return: The subscription resource.
"""
try:
subscription = topic.subscribe(
Protocol="sqs",
Endpoint=queue_arn,
)
logger.info("The queue is subscribed to the topic.")
return subscription
except ClientError as error:
logger.exception("Couldn't subscribe queue to topic!")
raise error
@staticmethod
def publish_price_update(topic, payload, group_id):
"""
Compose and publish a message that updates the wholesale price.
:param topic: The topic to publish to.
:param payload: The message to publish.
:param group_id: The group ID for the message.
:return: The ID of the message.
"""
try:
att_dict = {"business": {"DataType": "String", "StringValue": "wholesale"}}
dedup_id = uuid.uuid4()
response = topic.publish(
Subject="Price Update",
Message=payload,
MessageAttributes=att_dict,
MessageGroupId=group_id,
MessageDeduplicationId=str(dedup_id),
)
message_id = response["MessageId"]
logger.info("Published message to topic %s.", topic.arn)
except ClientError as error:
logger.exception("Couldn't publish message to topic %s.", topic.arn)
raise error
return message_id
@staticmethod
def delete_queue(queue):
"""
Removes an SQS queue. When run against an AWS account, it can take up to
60 seconds before the queue is actually deleted.
:param queue: The queue to delete.
:return: None
"""
try:
queue.delete()
logger.info("Deleted queue with URL=%s.", queue.url)
except ClientError as error:
logger.exception("Couldn't delete queue with URL=%s!", queue.url)
raise error
- SAP ABAP
-
- SDK 適用於 SAP ABAP
-
建立FIFO主題、訂閱主題的 Amazon SQSFIFO佇列,以及發佈訊息至 Amazon SNS主題。
" Creates a FIFO topic. "
DATA lt_tpc_attributes TYPE /aws1/cl_snstopicattrsmap_w=>tt_topicattributesmap.
DATA ls_tpc_attributes TYPE /aws1/cl_snstopicattrsmap_w=>ts_topicattributesmap_maprow.
ls_tpc_attributes-key = 'FifoTopic'.
ls_tpc_attributes-value = NEW /aws1/cl_snstopicattrsmap_w( iv_value = 'true' ).
INSERT ls_tpc_attributes INTO TABLE lt_tpc_attributes.
TRY.
DATA(lo_create_result) = lo_sns->createtopic(
iv_name = iv_topic_name
it_attributes = lt_tpc_attributes
).
DATA(lv_topic_arn) = lo_create_result->get_topicarn( ).
ov_topic_arn = lv_topic_arn. " ov_topic_arn is returned for testing purposes. "
MESSAGE 'FIFO topic created' TYPE 'I'.
CATCH /aws1/cx_snstopiclimitexcdex.
MESSAGE 'Unable to create more topics. You have reached the maximum number of topics allowed.' TYPE 'E'.
ENDTRY.
" Subscribes an endpoint to an Amazon Simple Notification Service (Amazon SNS) topic. "
" Only Amazon Simple Queue Service (Amazon SQS) FIFO queues can be subscribed to an SNS FIFO topic. "
TRY.
DATA(lo_subscribe_result) = lo_sns->subscribe(
iv_topicarn = lv_topic_arn
iv_protocol = 'sqs'
iv_endpoint = iv_queue_arn
).
DATA(lv_subscription_arn) = lo_subscribe_result->get_subscriptionarn( ).
ov_subscription_arn = lv_subscription_arn. " ov_subscription_arn is returned for testing purposes. "
MESSAGE 'SQS queue was subscribed to SNS topic.' TYPE 'I'.
CATCH /aws1/cx_snsnotfoundexception.
MESSAGE 'Topic does not exist.' TYPE 'E'.
CATCH /aws1/cx_snssubscriptionlmte00.
MESSAGE 'Unable to create subscriptions. You have reached the maximum number of subscriptions allowed.' TYPE 'E'.
ENDTRY.
" Publish message to SNS topic. "
TRY.
DATA lt_msg_attributes TYPE /aws1/cl_snsmessageattrvalue=>tt_messageattributemap.
DATA ls_msg_attributes TYPE /aws1/cl_snsmessageattrvalue=>ts_messageattributemap_maprow.
ls_msg_attributes-key = 'Importance'.
ls_msg_attributes-value = NEW /aws1/cl_snsmessageattrvalue( iv_datatype = 'String' iv_stringvalue = 'High' ).
INSERT ls_msg_attributes INTO TABLE lt_msg_attributes.
DATA(lo_result) = lo_sns->publish(
iv_topicarn = lv_topic_arn
iv_message = 'The price of your mobile plan has been increased from $19 to $23'
iv_subject = 'Changes to mobile plan'
iv_messagegroupid = 'Update-2'
iv_messagededuplicationid = 'Update-2.1'
it_messageattributes = lt_msg_attributes
).
ov_message_id = lo_result->get_messageid( ). " ov_message_id is returned for testing purposes. "
MESSAGE 'Message was published to SNS topic.' TYPE 'I'.
CATCH /aws1/cx_snsnotfoundexception.
MESSAGE 'Topic does not exist.' TYPE 'E'.
ENDTRY.
從FIFO訂閱接收訊息
您現在可以在三個已訂閱的應用程式中接收價格更新。如 所示Amazon SNSFIFO主題範例使用案例,每個取用者應用程式的進入點是 Amazon SQS佇列,其對應的 AWS Lambda 函數可以自動輪詢。當 Amazon SQS佇列是 Lambda 函數的事件來源時,Lambda 會視需要擴展其輪詢器機群,以有效率地取用訊息。
如需詳細資訊,請參閱 AWS Lambda 開發人員指南 中的AWS Lambda 搭配使用 AmazonSQS。如需撰寫自己的佇列輪詢器的資訊,請參閱 Amazon Simple Queue Service 開發人員指南 和 FIFO Amazon Simple Queue Service 參考 中的 Amazon SQS標準和佇列建議。 ReceiveMessage API
使用 AWS CloudFormation
AWS CloudFormation 可讓您使用範本檔案,將 AWS 資源集合建立和設定為單一單位。本節含有可建立下列項目的範例範本:
-
分發價格更新的 Amazon SNSFIFO主題
-
為批發和零售應用程式提供這些更新的 Amazon SQSFIFO佇列
-
存放記錄之分析應用程式的 Amazon SQS標準佇列,可查詢商業智慧 (BI)
-
將三個佇列連接至主題的 Amazon SNSFIFO訂閱
-
指定訂閱者應用程式只接收所需價格更新的篩選政策
如果您藉由將訊息發佈至主題來測試此程式碼範例,請確定發佈具有 business
屬性的訊息。指定 retail
或 wholesale
做為屬性值。否則,會篩選出訊息,而不會傳遞至訂閱佇列。
{
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"PriceUpdatesTopic": {
"Type": "AWS::SNS::Topic",
"Properties": {
"TopicName": "PriceUpdatesTopic.fifo",
"FifoTopic": true,
"ContentBasedDeduplication": false,
"ArchivePolicy": {
"MessageRetentionPeriod": "30"
}
}
},
"WholesaleQueue": {
"Type": "AWS::SQS::Queue",
"Properties": {
"QueueName": "WholesaleQueue.fifo",
"FifoQueue": true,
"ContentBasedDeduplication": false
}
},
"RetailQueue": {
"Type": "AWS::SQS::Queue",
"Properties": {
"QueueName": "RetailQueue.fifo",
"FifoQueue": true,
"ContentBasedDeduplication": false
}
},
"AnalyticsQueue": {
"Type": "AWS::SQS::Queue",
"Properties": {
"QueueName": "AnalyticsQueue"
}
},
"WholesaleSubscription": {
"Type": "AWS::SNS::Subscription",
"Properties": {
"TopicArn": {
"Ref": "PriceUpdatesTopic"
},
"Endpoint": {
"Fn::GetAtt": [
"WholesaleQueue",
"Arn"
]
},
"Protocol": "sqs",
"RawMessageDelivery": "false",
"FilterPolicyScope": "MessageBody",
"FilterPolicy": {
"business": [
"wholesale"
]
}
}
},
"RetailSubscription": {
"Type": "AWS::SNS::Subscription",
"Properties": {
"TopicArn": {
"Ref": "PriceUpdatesTopic"
},
"Endpoint": {
"Fn::GetAtt": [
"RetailQueue",
"Arn"
]
},
"Protocol": "sqs",
"RawMessageDelivery": "false",
"FilterPolicyScope": "MessageBody",
"FilterPolicy": {
"business": [
"retail"
]
}
}
},
"AnalyticsSubscription": {
"Type": "AWS::SNS::Subscription",
"Properties": {
"TopicArn": {
"Ref": "PriceUpdatesTopic"
},
"Endpoint": {
"Fn::GetAtt": [
"AnalyticsQueue",
"Arn"
]
},
"Protocol": "sqs",
"RawMessageDelivery": "false"
}
},
"SalesQueuesPolicy": {
"Type": "AWS::SQS::QueuePolicy",
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "sns.amazonaws.com"
},
"Action": [
"sqs:SendMessage"
],
"Resource": "*",
"Condition": {
"ArnEquals": {
"aws:SourceArn": {
"Ref": "PriceUpdatesTopic"
}
}
}
}
]
},
"Queues": [
{
"Ref": "WholesaleQueue"
},
{
"Ref": "RetailQueue"
},
{
"Ref": "AnalyticsQueue"
}
]
}
}
}
}
如需使用 AWS CloudFormation 範本部署 AWS 資源的詳細資訊,請參閱 AWS CloudFormation 使用者指南 中的入門。