使用 SNS for Python (Boto3) 的 Amazon SDK 範例 - AWS SDK 程式碼範例

文件 AWS SDK AWS 範例 SDK 儲存庫中有更多可用的 GitHub 範例。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 SNS for Python (Boto3) 的 Amazon SDK 範例

下列程式碼範例示範如何搭配 Amazon AWS SDK for Python (Boto3) SNS 使用 來執行動作和實作常見案例。

Actions 是大型程式的程式碼摘錄,必須在內容中執行。雖然 動作會示範如何呼叫個別服務函數,但您可以在其相關案例中查看內容中的動作。

案例是程式碼範例,示範如何透過呼叫服務內的多個函數或與其他函數結合來完成特定任務 AWS 服務。

每個範例都包含完整原始程式碼的連結,您可以在其中找到如何在內容中設定和執行程式碼的指示。

動作

下列程式碼範例示範如何使用 CreateTopic

Python 的 SDK (Boto3)
注意

還有更多 on GitHub。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

class SnsWrapper: """Encapsulates Amazon SNS topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource def create_topic(self, name): """ Creates a notification topic. :param name: The name of the topic to create. :return: The newly created topic. """ try: topic = self.sns_resource.create_topic(Name=name) logger.info("Created topic %s with ARN %s.", name, topic.arn) except ClientError: logger.exception("Couldn't create topic %s.", name) raise else: return topic
  • 如需 API 詳細資訊,請參閱 CreateTopic AWS SDK for Python (Boto3) Word 參考中的 API

下列程式碼範例示範如何使用 DeleteTopic

Python 的 SDK (Boto3)
注意

還有更多 on GitHub。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

class SnsWrapper: """Encapsulates Amazon SNS topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource @staticmethod def delete_topic(topic): """ Deletes a topic. All subscriptions to the topic are also deleted. """ try: topic.delete() logger.info("Deleted topic %s.", topic.arn) except ClientError: logger.exception("Couldn't delete topic %s.", topic.arn) raise
  • 如需 API 詳細資訊,請參閱 DeleteTopic AWS SDK for Python (Boto3) Word 參考中的 API

下列程式碼範例示範如何使用 ListSubscriptions

Python 的 SDK (Boto3)
注意

還有更多 on GitHub。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

class SnsWrapper: """Encapsulates Amazon SNS topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource def list_subscriptions(self, topic=None): """ Lists subscriptions for the current account, optionally limited to a specific topic. :param topic: When specified, only subscriptions to this topic are returned. :return: An iterator that yields the subscriptions. """ try: if topic is None: subs_iter = self.sns_resource.subscriptions.all() else: subs_iter = topic.subscriptions.all() logger.info("Got subscriptions.") except ClientError: logger.exception("Couldn't get subscriptions.") raise else: return subs_iter
  • 如需 API 詳細資訊,請參閱 ListSubscriptions AWS SDK for Python (Boto3) Word 參考中的 API

下列程式碼範例示範如何使用 ListTopics

Python 的 SDK (Boto3)
注意

還有更多 on GitHub。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

class SnsWrapper: """Encapsulates Amazon SNS topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource def list_topics(self): """ Lists topics for the current account. :return: An iterator that yields the topics. """ try: topics_iter = self.sns_resource.topics.all() logger.info("Got topics.") except ClientError: logger.exception("Couldn't get topics.") raise else: return topics_iter
  • 如需 API 詳細資訊,請參閱 ListTopics AWS SDK for Python (Boto3) Word 參考中的 API

下列程式碼範例示範如何使用 Publish

Python 的 SDK (Boto3)
注意

還有更多 on GitHub。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

發佈具有屬性的郵件,以便訂閱可以根據屬性進行篩選。

class SnsWrapper: """Encapsulates Amazon SNS topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource @staticmethod def publish_message(topic, message, attributes): """ Publishes a message, with attributes, to a topic. Subscriptions can be filtered based on message attributes so that a subscription receives messages only when specified attributes are present. :param topic: The topic to publish to. :param message: The message to publish. :param attributes: The key-value attributes to attach to the message. Values must be either `str` or `bytes`. :return: The ID of the message. """ try: att_dict = {} for key, value in attributes.items(): if isinstance(value, str): att_dict[key] = {"DataType": "String", "StringValue": value} elif isinstance(value, bytes): att_dict[key] = {"DataType": "Binary", "BinaryValue": value} response = topic.publish(Message=message, MessageAttributes=att_dict) message_id = response["MessageId"] logger.info( "Published message with attributes %s to topic %s.", attributes, topic.arn, ) except ClientError: logger.exception("Couldn't publish message to topic %s.", topic.arn) raise else: return message_id

發佈根據訂閱者的通訊協定採用不同形式的訊息。

class SnsWrapper: """Encapsulates Amazon SNS topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource @staticmethod def publish_multi_message( topic, subject, default_message, sms_message, email_message ): """ Publishes a multi-format message to a topic. A multi-format message takes different forms based on the protocol of the subscriber. For example, an SMS subscriber might receive a short version of the message while an email subscriber could receive a longer version. :param topic: The topic to publish to. :param subject: The subject of the message. :param default_message: The default version of the message. This version is sent to subscribers that have protocols that are not otherwise specified in the structured message. :param sms_message: The version of the message sent to SMS subscribers. :param email_message: The version of the message sent to email subscribers. :return: The ID of the message. """ try: message = { "default": default_message, "sms": sms_message, "email": email_message, } response = topic.publish( Message=json.dumps(message), Subject=subject, MessageStructure="json" ) message_id = response["MessageId"] logger.info("Published multi-format message to topic %s.", topic.arn) except ClientError: logger.exception("Couldn't publish message to topic %s.", topic.arn) raise else: return message_id
  • 如需 API 詳細資訊,請參閱在 Word for Python (Boto3) 中發佈 Word 參考。 AWS SDK API

下列程式碼範例示範如何使用 SetSubscriptionAttributes

Python 的 SDK (Boto3)
注意

還有更多 on GitHub。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

class SnsWrapper: """Encapsulates Amazon SNS topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource @staticmethod def add_subscription_filter(subscription, attributes): """ Adds a filter policy to a subscription. A filter policy is a key and a list of values that are allowed. When a message is published, it must have an attribute that passes the filter or it will not be sent to the subscription. :param subscription: The subscription the filter policy is attached to. :param attributes: A dictionary of key-value pairs that define the filter. """ try: att_policy = {key: [value] for key, value in attributes.items()} subscription.set_attributes( AttributeName="FilterPolicy", AttributeValue=json.dumps(att_policy) ) logger.info("Added filter to subscription %s.", subscription.arn) except ClientError: logger.exception( "Couldn't add filter to subscription %s.", subscription.arn ) raise

下列程式碼範例示範如何使用 Subscribe

Python 的 SDK (Boto3)
注意

還有更多 on GitHub。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

訂閱主題的電子郵件地址。

class SnsWrapper: """Encapsulates Amazon SNS topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource @staticmethod def subscribe(topic, protocol, endpoint): """ Subscribes an endpoint to the topic. Some endpoint types, such as email, must be confirmed before their subscriptions are active. When a subscription is not confirmed, its Amazon Resource Number (ARN) is set to 'PendingConfirmation'. :param topic: The topic to subscribe to. :param protocol: The protocol of the endpoint, such as 'sms' or 'email'. :param endpoint: The endpoint that receives messages, such as a phone number (in E.164 format) for SMS messages, or an email address for email messages. :return: The newly added subscription. """ try: subscription = topic.subscribe( Protocol=protocol, Endpoint=endpoint, ReturnSubscriptionArn=True ) logger.info("Subscribed %s %s to topic %s.", protocol, endpoint, topic.arn) except ClientError: logger.exception( "Couldn't subscribe %s %s to topic %s.", protocol, endpoint, topic.arn ) raise else: return subscription

下列程式碼範例示範如何使用 Unsubscribe

Python 的 SDK (Boto3)
注意

還有更多 on GitHub。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

class SnsWrapper: """Encapsulates Amazon SNS topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource @staticmethod def delete_subscription(subscription): """ Unsubscribes and deletes a subscription. """ try: subscription.delete() logger.info("Deleted subscription %s.", subscription.arn) except ClientError: logger.exception("Couldn't delete subscription %s.", subscription.arn) raise
  • 如需 API 詳細資訊,請參閱在 AWS SDK for Python (Boto3) API 參考取消訂閱

案例

下列程式碼範例示範如何透過互動式應用程式探索 Amazon Textract 輸出。

Python 的 SDK (Boto3)

示範如何使用 AWS SDK for Python (Boto3) 搭配 Amazon Textract 來偵測文件映像中的文字、表單和資料表元素。輸入影像和 Amazon Textract 輸出會顯示在 Tkinter 應用程式中,可讓您探索偵測到的元素。

  • 將文件影像提交到 Amazon Textract,並探索偵測到元素的輸出。

  • 將影像直接傳送至 Amazon Textract 或透過 Amazon Simple Storage Service (Amazon S3) 儲存貯體。

  • 使用非同步 APIs 啟動任務,在任務完成時將通知發佈至 Amazon Simple Notification Service (Amazon SNS) 主題。

  • 查詢任務完成訊息的 Amazon Simple Queue Service (Amazon SQS) 佇列,並顯示結果。

如需完整的原始程式碼和如何設定和執行的指示,請參閱 GitHub 上的完整範例。

此範例中使用的服務
  • Amazon S3

  • Amazon SNS

  • Amazon SQS

  • Amazon Textract

下列程式碼範例示範如何建立和發佈至 FIFO Amazon SNS 主題。

Python 的 SDK (Boto3)
注意

還有更多 on GitHub。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

建立 Amazon SNS FIFO主題、將 Amazon SQS FIFO和標準佇列訂閱至主題,以及發佈訊息至主題。

def usage_demo(): """Shows how to subscribe queues to a FIFO topic.""" print("-" * 88) print("Welcome to the `Subscribe queues to a FIFO topic` demo!") print("-" * 88) sns = boto3.resource("sns") sqs = boto3.resource("sqs") fifo_topic_wrapper = FifoTopicWrapper(sns) sns_wrapper = SnsWrapper(sns) prefix = "sqs-subscribe-demo-" queues = set() subscriptions = set() wholesale_queue = sqs.create_queue( QueueName=prefix + "wholesale.fifo", Attributes={ "MaximumMessageSize": str(4096), "ReceiveMessageWaitTimeSeconds": str(10), "VisibilityTimeout": str(300), "FifoQueue": str(True), "ContentBasedDeduplication": str(True), }, ) queues.add(wholesale_queue) print(f"Created FIFO queue with URL: {wholesale_queue.url}.") retail_queue = sqs.create_queue( QueueName=prefix + "retail.fifo", Attributes={ "MaximumMessageSize": str(4096), "ReceiveMessageWaitTimeSeconds": str(10), "VisibilityTimeout": str(300), "FifoQueue": str(True), "ContentBasedDeduplication": str(True), }, ) queues.add(retail_queue) print(f"Created FIFO queue with URL: {retail_queue.url}.") analytics_queue = sqs.create_queue(QueueName=prefix + "analytics", Attributes={}) queues.add(analytics_queue) print(f"Created standard queue with URL: {analytics_queue.url}.") topic = fifo_topic_wrapper.create_fifo_topic("price-updates-topic.fifo") print(f"Created FIFO topic: {topic.attributes['TopicArn']}.") for q in queues: fifo_topic_wrapper.add_access_policy(q, topic.attributes["TopicArn"]) print(f"Added access policies for topic: {topic.attributes['TopicArn']}.") for q in queues: sub = fifo_topic_wrapper.subscribe_queue_to_topic( topic, q.attributes["QueueArn"] ) subscriptions.add(sub) print(f"Subscribed queues to topic: {topic.attributes['TopicArn']}.") input("Press Enter to publish a message to the topic.") message_id = fifo_topic_wrapper.publish_price_update( topic, '{"product": 214, "price": 79.99}', "Consumables" ) print(f"Published price update with message ID: {message_id}.") # Clean up the subscriptions, queues, and topic. input("Press Enter to clean up resources.") for s in subscriptions: sns_wrapper.delete_subscription(s) sns_wrapper.delete_topic(topic) for q in queues: fifo_topic_wrapper.delete_queue(q) print(f"Deleted subscriptions, queues, and topic.") print("Thanks for watching!") print("-" * 88) class FifoTopicWrapper: """Encapsulates Amazon SNS FIFO topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource def create_fifo_topic(self, topic_name): """ Create a FIFO topic. Topic names must be made up of only uppercase and lowercase ASCII letters, numbers, underscores, and hyphens, and must be between 1 and 256 characters long. For a FIFO topic, the name must end with the .fifo suffix. :param topic_name: The name for the topic. :return: The new topic. """ try: topic = self.sns_resource.create_topic( Name=topic_name, Attributes={ "FifoTopic": str(True), "ContentBasedDeduplication": str(False), }, ) logger.info("Created FIFO topic with name=%s.", topic_name) return topic except ClientError as error: logger.exception("Couldn't create topic with name=%s!", topic_name) raise error @staticmethod def add_access_policy(queue, topic_arn): """ Add the necessary access policy to a queue, so it can receive messages from a topic. :param queue: The queue resource. :param topic_arn: The ARN of the topic. :return: None. """ try: queue.set_attributes( Attributes={ "Policy": json.dumps( { "Version": "2012-10-17", "Statement": [ { "Sid": "test-sid", "Effect": "Allow", "Principal": {"AWS": "*"}, "Action": "SQS:SendMessage", "Resource": queue.attributes["QueueArn"], "Condition": { "ArnLike": {"aws:SourceArn": topic_arn} }, } ], } ) } ) logger.info("Added trust policy to the queue.") except ClientError as error: logger.exception("Couldn't add trust policy to the queue!") raise error @staticmethod def subscribe_queue_to_topic(topic, queue_arn): """ Subscribe a queue to a topic. :param topic: The topic resource. :param queue_arn: The ARN of the queue. :return: The subscription resource. """ try: subscription = topic.subscribe( Protocol="sqs", Endpoint=queue_arn, ) logger.info("The queue is subscribed to the topic.") return subscription except ClientError as error: logger.exception("Couldn't subscribe queue to topic!") raise error @staticmethod def publish_price_update(topic, payload, group_id): """ Compose and publish a message that updates the wholesale price. :param topic: The topic to publish to. :param payload: The message to publish. :param group_id: The group ID for the message. :return: The ID of the message. """ try: att_dict = {"business": {"DataType": "String", "StringValue": "wholesale"}} dedup_id = uuid.uuid4() response = topic.publish( Subject="Price Update", Message=payload, MessageAttributes=att_dict, MessageGroupId=group_id, MessageDeduplicationId=str(dedup_id), ) message_id = response["MessageId"] logger.info("Published message to topic %s.", topic.arn) except ClientError as error: logger.exception("Couldn't publish message to topic %s.", topic.arn) raise error return message_id @staticmethod def delete_queue(queue): """ Removes an SQS queue. When run against an AWS account, it can take up to 60 seconds before the queue is actually deleted. :param queue: The queue to delete. :return: None """ try: queue.delete() logger.info("Deleted queue with URL=%s.", queue.url) except ClientError as error: logger.exception("Couldn't delete queue with URL=%s!", queue.url) raise error

下列程式碼範例示範如何使用 Amazon SMS 發佈 SNS 訊息。

Python 的 SDK (Boto3)
注意

還有更多 on GitHub。尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

class SnsWrapper: """Encapsulates Amazon SNS topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource def publish_text_message(self, phone_number, message): """ Publishes a text message directly to a phone number without need for a subscription. :param phone_number: The phone number that receives the message. This must be in E.164 format. For example, a United States phone number might be +12065550101. :param message: The message to send. :return: The ID of the message. """ try: response = self.sns_resource.meta.client.publish( PhoneNumber=phone_number, Message=message ) message_id = response["MessageId"] logger.info("Published message to %s.", phone_number) except ClientError: logger.exception("Couldn't publish message to %s.", phone_number) raise else: return message_id
  • 如需 API 詳細資訊,請參閱在 Word for Python (Boto3) 中發佈 Word 參考。 AWS SDK API

無伺服器範例

下列程式碼範例示範如何實作 Lambda 函數,該函數接收從 SNS 主題接收訊息所觸發的事件。函數會從事件參數擷取訊息,並記錄每一則訊息的內容。

Python 的 SDK (Boto3)
注意

還有更多 on GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Python 搭配 Lambda 使用 SNS 事件。

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def lambda_handler(event, context): for record in event['Records']: process_message(record) print("done") def process_message(record): try: message = record['Sns']['Message'] print(f"Processed message {message}") # TODO; Process your record here except Exception as e: print("An error occurred") raise e