使用 SDK for Python (Boto3) 的 Amazon SNS 示例 - AWS SDK 代码示例

文档 AWS SDK 示例 GitHub 存储库中还有更多 S AWS DK 示例

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 SDK for Python (Boto3) 的 Amazon SNS 示例

以下代码示例向您展示了如何在 Amazon SNS 中使用来执行操作和实现常见场景。 AWS SDK for Python (Boto3)

操作是大型程序的代码摘录,必须在上下文中运行。您可以通过操作了解如何调用单个服务函数,还可以通过函数相关场景的上下文查看操作。

场景是向您演示如何通过在一个服务中调用多个函数或与其他 AWS 服务结合来完成特定任务的代码示例。

每个示例都包含一个指向完整源代码的链接,您可以从中找到有关如何在上下文中设置和运行代码的说明。

操作

以下代码示例演示如何使用 CreateTopic

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class SnsWrapper: """Encapsulates Amazon SNS topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource def create_topic(self, name): """ Creates a notification topic. :param name: The name of the topic to create. :return: The newly created topic. """ try: topic = self.sns_resource.create_topic(Name=name) logger.info("Created topic %s with ARN %s.", name, topic.arn) except ClientError: logger.exception("Couldn't create topic %s.", name) raise else: return topic
  • 有关 API 的详细信息,请参阅适用CreateTopicPython 的AWS SDK (Boto3) API 参考

以下代码示例演示如何使用 DeleteTopic

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class SnsWrapper: """Encapsulates Amazon SNS topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource @staticmethod def delete_topic(topic): """ Deletes a topic. All subscriptions to the topic are also deleted. """ try: topic.delete() logger.info("Deleted topic %s.", topic.arn) except ClientError: logger.exception("Couldn't delete topic %s.", topic.arn) raise
  • 有关 API 的详细信息,请参阅适用DeleteTopicPython 的AWS SDK (Boto3) API 参考

以下代码示例演示如何使用 ListSubscriptions

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class SnsWrapper: """Encapsulates Amazon SNS topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource def list_subscriptions(self, topic=None): """ Lists subscriptions for the current account, optionally limited to a specific topic. :param topic: When specified, only subscriptions to this topic are returned. :return: An iterator that yields the subscriptions. """ try: if topic is None: subs_iter = self.sns_resource.subscriptions.all() else: subs_iter = topic.subscriptions.all() logger.info("Got subscriptions.") except ClientError: logger.exception("Couldn't get subscriptions.") raise else: return subs_iter
  • 有关 API 的详细信息,请参阅适用ListSubscriptionsPython 的AWS SDK (Boto3) API 参考

以下代码示例演示如何使用 ListTopics

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class SnsWrapper: """Encapsulates Amazon SNS topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource def list_topics(self): """ Lists topics for the current account. :return: An iterator that yields the topics. """ try: topics_iter = self.sns_resource.topics.all() logger.info("Got topics.") except ClientError: logger.exception("Couldn't get topics.") raise else: return topics_iter
  • 有关 API 的详细信息,请参阅适用ListTopicsPython 的AWS SDK (Boto3) API 参考

以下代码示例演示如何使用 Publish

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

发布包含属性的消息,以便订阅可以根据属性进行筛选。

class SnsWrapper: """Encapsulates Amazon SNS topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource @staticmethod def publish_message(topic, message, attributes): """ Publishes a message, with attributes, to a topic. Subscriptions can be filtered based on message attributes so that a subscription receives messages only when specified attributes are present. :param topic: The topic to publish to. :param message: The message to publish. :param attributes: The key-value attributes to attach to the message. Values must be either `str` or `bytes`. :return: The ID of the message. """ try: att_dict = {} for key, value in attributes.items(): if isinstance(value, str): att_dict[key] = {"DataType": "String", "StringValue": value} elif isinstance(value, bytes): att_dict[key] = {"DataType": "Binary", "BinaryValue": value} response = topic.publish(Message=message, MessageAttributes=att_dict) message_id = response["MessageId"] logger.info( "Published message with attributes %s to topic %s.", attributes, topic.arn, ) except ClientError: logger.exception("Couldn't publish message to topic %s.", topic.arn) raise else: return message_id

发布基于订阅用户的协议采取不同形式的消息。

class SnsWrapper: """Encapsulates Amazon SNS topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource @staticmethod def publish_multi_message( topic, subject, default_message, sms_message, email_message ): """ Publishes a multi-format message to a topic. A multi-format message takes different forms based on the protocol of the subscriber. For example, an SMS subscriber might receive a short version of the message while an email subscriber could receive a longer version. :param topic: The topic to publish to. :param subject: The subject of the message. :param default_message: The default version of the message. This version is sent to subscribers that have protocols that are not otherwise specified in the structured message. :param sms_message: The version of the message sent to SMS subscribers. :param email_message: The version of the message sent to email subscribers. :return: The ID of the message. """ try: message = { "default": default_message, "sms": sms_message, "email": email_message, } response = topic.publish( Message=json.dumps(message), Subject=subject, MessageStructure="json" ) message_id = response["MessageId"] logger.info("Published multi-format message to topic %s.", topic.arn) except ClientError: logger.exception("Couldn't publish message to topic %s.", topic.arn) raise else: return message_id
  • 有关 API 详细信息,请参阅《AWS SDK for Python (Boto3) API Reference》中的 Publish

以下代码示例演示如何使用 SetSubscriptionAttributes

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class SnsWrapper: """Encapsulates Amazon SNS topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource @staticmethod def add_subscription_filter(subscription, attributes): """ Adds a filter policy to a subscription. A filter policy is a key and a list of values that are allowed. When a message is published, it must have an attribute that passes the filter or it will not be sent to the subscription. :param subscription: The subscription the filter policy is attached to. :param attributes: A dictionary of key-value pairs that define the filter. """ try: att_policy = {key: [value] for key, value in attributes.items()} subscription.set_attributes( AttributeName="FilterPolicy", AttributeValue=json.dumps(att_policy) ) logger.info("Added filter to subscription %s.", subscription.arn) except ClientError: logger.exception( "Couldn't add filter to subscription %s.", subscription.arn ) raise

以下代码示例演示如何使用 Subscribe

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

将电子邮件地址订阅到主题。

class SnsWrapper: """Encapsulates Amazon SNS topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource @staticmethod def subscribe(topic, protocol, endpoint): """ Subscribes an endpoint to the topic. Some endpoint types, such as email, must be confirmed before their subscriptions are active. When a subscription is not confirmed, its Amazon Resource Number (ARN) is set to 'PendingConfirmation'. :param topic: The topic to subscribe to. :param protocol: The protocol of the endpoint, such as 'sms' or 'email'. :param endpoint: The endpoint that receives messages, such as a phone number (in E.164 format) for SMS messages, or an email address for email messages. :return: The newly added subscription. """ try: subscription = topic.subscribe( Protocol=protocol, Endpoint=endpoint, ReturnSubscriptionArn=True ) logger.info("Subscribed %s %s to topic %s.", protocol, endpoint, topic.arn) except ClientError: logger.exception( "Couldn't subscribe %s %s to topic %s.", protocol, endpoint, topic.arn ) raise else: return subscription
  • 有关 API 详细信息,请参阅《AWS SDK for Python (Boto3) API 参考》中的 Subscribe

以下代码示例演示如何使用 Unsubscribe

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class SnsWrapper: """Encapsulates Amazon SNS topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource @staticmethod def delete_subscription(subscription): """ Unsubscribes and deletes a subscription. """ try: subscription.delete() logger.info("Deleted subscription %s.", subscription.arn) except ClientError: logger.exception("Couldn't delete subscription %s.", subscription.arn) raise
  • 有关 API 详细信息,请参阅《AWS SDK for Python (Boto3) API 参考》中的 Unsubscribe

场景

以下代码示例展示了如何通过交互式应用程序浏览 Amazon Textract 的输出。

适用于 Python 的 SDK(Boto3)

演示如何 AWS SDK for Python (Boto3) 与 Amazon Textract 配合使用来检测文档图像中的文本、表单和表格元素。输入图像和 Amazon Textract 输出在 Tkinter 应用程序中显示,该应用程序可让您探索检测到的元素。

  • 将文档图像提交到 Amazon Textract 并探索检测到的元素的输出。

  • 将图像直接提交到 Amazon Textract,或通过 Amazon Simple Storage Service(Amazon S3)桶提交图像。

  • 使用异步 APIs 启动任务,该任务在任务完成时向亚马逊简单通知服务 (Amazon SNS) Simple Notification Service 主题发布通知。

  • 轮询 Amazon Simple Queue Service (Amazon SQS) 队列,以获取任务完成消息并显示结果。

有关如何设置和运行的完整源代码和说明,请参阅上的完整示例GitHub

本示例中使用的服务
  • Amazon S3

  • Amazon SNS

  • Amazon SQS

  • Amazon Textract

以下代码示例显示了如何创建并发布到 Amazon SNS FIFO 主题。

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

创建 Amazon SNS FIFO 主题,将 Amazon SQS FIFO 队列和标准队列订阅到主题,并发布一条消息到主题。

def usage_demo(): """Shows how to subscribe queues to a FIFO topic.""" print("-" * 88) print("Welcome to the `Subscribe queues to a FIFO topic` demo!") print("-" * 88) sns = boto3.resource("sns") sqs = boto3.resource("sqs") fifo_topic_wrapper = FifoTopicWrapper(sns) sns_wrapper = SnsWrapper(sns) prefix = "sqs-subscribe-demo-" queues = set() subscriptions = set() wholesale_queue = sqs.create_queue( QueueName=prefix + "wholesale.fifo", Attributes={ "MaximumMessageSize": str(4096), "ReceiveMessageWaitTimeSeconds": str(10), "VisibilityTimeout": str(300), "FifoQueue": str(True), "ContentBasedDeduplication": str(True), }, ) queues.add(wholesale_queue) print(f"Created FIFO queue with URL: {wholesale_queue.url}.") retail_queue = sqs.create_queue( QueueName=prefix + "retail.fifo", Attributes={ "MaximumMessageSize": str(4096), "ReceiveMessageWaitTimeSeconds": str(10), "VisibilityTimeout": str(300), "FifoQueue": str(True), "ContentBasedDeduplication": str(True), }, ) queues.add(retail_queue) print(f"Created FIFO queue with URL: {retail_queue.url}.") analytics_queue = sqs.create_queue(QueueName=prefix + "analytics", Attributes={}) queues.add(analytics_queue) print(f"Created standard queue with URL: {analytics_queue.url}.") topic = fifo_topic_wrapper.create_fifo_topic("price-updates-topic.fifo") print(f"Created FIFO topic: {topic.attributes['TopicArn']}.") for q in queues: fifo_topic_wrapper.add_access_policy(q, topic.attributes["TopicArn"]) print(f"Added access policies for topic: {topic.attributes['TopicArn']}.") for q in queues: sub = fifo_topic_wrapper.subscribe_queue_to_topic( topic, q.attributes["QueueArn"] ) subscriptions.add(sub) print(f"Subscribed queues to topic: {topic.attributes['TopicArn']}.") input("Press Enter to publish a message to the topic.") message_id = fifo_topic_wrapper.publish_price_update( topic, '{"product": 214, "price": 79.99}', "Consumables" ) print(f"Published price update with message ID: {message_id}.") # Clean up the subscriptions, queues, and topic. input("Press Enter to clean up resources.") for s in subscriptions: sns_wrapper.delete_subscription(s) sns_wrapper.delete_topic(topic) for q in queues: fifo_topic_wrapper.delete_queue(q) print(f"Deleted subscriptions, queues, and topic.") print("Thanks for watching!") print("-" * 88) class FifoTopicWrapper: """Encapsulates Amazon SNS FIFO topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource def create_fifo_topic(self, topic_name): """ Create a FIFO topic. Topic names must be made up of only uppercase and lowercase ASCII letters, numbers, underscores, and hyphens, and must be between 1 and 256 characters long. For a FIFO topic, the name must end with the .fifo suffix. :param topic_name: The name for the topic. :return: The new topic. """ try: topic = self.sns_resource.create_topic( Name=topic_name, Attributes={ "FifoTopic": str(True), "ContentBasedDeduplication": str(False), }, ) logger.info("Created FIFO topic with name=%s.", topic_name) return topic except ClientError as error: logger.exception("Couldn't create topic with name=%s!", topic_name) raise error @staticmethod def add_access_policy(queue, topic_arn): """ Add the necessary access policy to a queue, so it can receive messages from a topic. :param queue: The queue resource. :param topic_arn: The ARN of the topic. :return: None. """ try: queue.set_attributes( Attributes={ "Policy": json.dumps( { "Version": "2012-10-17", "Statement": [ { "Sid": "test-sid", "Effect": "Allow", "Principal": {"AWS": "*"}, "Action": "SQS:SendMessage", "Resource": queue.attributes["QueueArn"], "Condition": { "ArnLike": {"aws:SourceArn": topic_arn} }, } ], } ) } ) logger.info("Added trust policy to the queue.") except ClientError as error: logger.exception("Couldn't add trust policy to the queue!") raise error @staticmethod def subscribe_queue_to_topic(topic, queue_arn): """ Subscribe a queue to a topic. :param topic: The topic resource. :param queue_arn: The ARN of the queue. :return: The subscription resource. """ try: subscription = topic.subscribe( Protocol="sqs", Endpoint=queue_arn, ) logger.info("The queue is subscribed to the topic.") return subscription except ClientError as error: logger.exception("Couldn't subscribe queue to topic!") raise error @staticmethod def publish_price_update(topic, payload, group_id): """ Compose and publish a message that updates the wholesale price. :param topic: The topic to publish to. :param payload: The message to publish. :param group_id: The group ID for the message. :return: The ID of the message. """ try: att_dict = {"business": {"DataType": "String", "StringValue": "wholesale"}} dedup_id = uuid.uuid4() response = topic.publish( Subject="Price Update", Message=payload, MessageAttributes=att_dict, MessageGroupId=group_id, MessageDeduplicationId=str(dedup_id), ) message_id = response["MessageId"] logger.info("Published message to topic %s.", topic.arn) except ClientError as error: logger.exception("Couldn't publish message to topic %s.", topic.arn) raise error return message_id @staticmethod def delete_queue(queue): """ Removes an SQS queue. When run against an AWS account, it can take up to 60 seconds before the queue is actually deleted. :param queue: The queue to delete. :return: None """ try: queue.delete() logger.info("Deleted queue with URL=%s.", queue.url) except ClientError as error: logger.exception("Couldn't delete queue with URL=%s!", queue.url) raise error
  • 有关 API 详细信息,请参阅《AWS SDK for Python (Boto3) API Reference》中的以下主题。

以下代码示例演示了如何使用 Amazon SNS 发布 SMS 消息。

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class SnsWrapper: """Encapsulates Amazon SNS topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource def publish_text_message(self, phone_number, message): """ Publishes a text message directly to a phone number without need for a subscription. :param phone_number: The phone number that receives the message. This must be in E.164 format. For example, a United States phone number might be +12065550101. :param message: The message to send. :return: The ID of the message. """ try: response = self.sns_resource.meta.client.publish( PhoneNumber=phone_number, Message=message ) message_id = response["MessageId"] logger.info("Published message to %s.", phone_number) except ClientError: logger.exception("Couldn't publish message to %s.", phone_number) raise else: return message_id
  • 有关 API 详细信息,请参阅《AWS SDK for Python (Boto3) API Reference》中的 Publish

无服务器示例

以下代码示例展示了如何实现一个 Lambda 函数,该函数接收因接收来自 SNS 主题的消息而触发的事件。该函数从事件参数检索消息并记录每条消息的内容。

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 Python 将 SNS 事件与 Lambda 结合使用。

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def lambda_handler(event, context): for record in event['Records']: process_message(record) print("done") def process_message(record): try: message = record['Sns']['Message'] print(f"Processed message {message}") # TODO; Process your record here except Exception as e: print("An error occurred") raise e