適用於 Python 的 Amazon SNS 擴充用戶端程式庫 - Amazon Simple Notification Service

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

適用於 Python 的 Amazon SNS 擴充用戶端程式庫

先決條件

下列是使用適用於 Java 的 Amazon SNS 擴充用戶端程式庫的先決條件:

  • AWS 開發套件。

    本頁面上的範例使用 AWS Python SDK Boto3。若要安裝和設定 SDK,請參閱 AWS SDK for Python 說明文件。

  • AWS 帳戶 具有適當登入資料的 。

    若要建立 AWS 帳戶,請導覽至AWS 首頁,然後選擇建立 AWS 帳戶。遵循指示。

    如需憑證的資訊,請參閱 AWS SDK for Python 開發人員指南 中的憑證

  • Python 3.x (或更高版本) 和 pip。

  • 適用於 Python 的 Amazon SNS 擴充用戶端程式庫(也可從 PyPI 取得)。

設定訊息儲存

下列屬性可在 Boto3 Amazon SNS 用戶端主題PlatformEndPoint 物件上使用,以設定 Amazon S3 訊息儲存選項。

  • large_payload_support – 用來儲存大量訊息的 Amazon S3 儲存貯體名稱。

  • message_size_threshold – 將郵件儲存在大型訊息儲存貯體中的臨界值。不可小於 0 或大於 262144。預設值為 262144。

  • always_through_s3 – 如果是True,則所有訊息都存放在 Amazon S3 中。預設值為 False

  • s3 – 用於將物件存放在 Amazon S3 中的 Boto3 Amazon S3 resource 物件。如果您想要控制 Amazon S3 資源 (例如自訂 Amazon S3 組態或憑證),請使用此選項。如果先前未在第一次使用時設定,預設值為 boto3.resource("s3")

範例:將訊息發布到 Amazon SNS,其中儲存在 Amazon S3 中的有效酬載

以下程式碼範例顯示做法:

  • 建立範例 Amazon SNS 主題和 Amazon SQS 佇列。

  • 訂閱佇列以接收來自主題的訊息。

  • 發布測試訊息。

  • 訊息酬載儲存在 Amazon S3 中,並發布對該訊息的參考。

  • 列印佇列中已發布的訊息以及從 Amazon S3 擷取的原始訊息。

若要發布大型訊息,請使用適用於 Python 的 Amazon SNS 擴充用戶端程式庫。您傳送的訊息會參考包含實際訊息內容的 Amazon S3 物件。

import boto3 import sns_extended_client from json import loads s3_extended_payload_bucket = "extended-client-bucket-store" TOPIC_NAME = "TOPIC-NAME" QUEUE_NAME = "QUEUE-NAME" # Create an helper to fetch message from S3 def get_msg_from_s3(body): json_msg = loads(body) s3_client = boto3.client("s3") s3_object = s3_client.get_object( Bucket=json_msg[1].get("s3BucketName"), Key=json_msg[1].get("s3Key") ) msg = s3_object.get("Body").read().decode() return msg # Create an helper to fetch and print message SQS queue and S3 def fetch_and_print_from_sqs(sqs, queue_url): """Handy Helper to fetch and print message from SQS queue and S3""" message = sqs.receive_message( QueueUrl=queue_url, MessageAttributeNames=["All"], MaxNumberOfMessages=1 ).get("Messages")[0] message_body = message.get("Body") print("Published Message: {}".format(message_body)) print("Message Stored in S3 Bucket is: {}\n".format(get_msg_from_s3(message_body))) # Initialize the SNS client and create SNS Topic sns_extended_client = boto3.client("sns", region_name="us-east-1") create_topic_response = sns_extended_client.create_topic(Name=TOPIC_NAME) demo_topic_arn = create_topic_response.get("TopicArn") # Create and subscribe an SQS queue to the SNS client sqs = boto3.client("sqs") demo_queue_url = sqs.create_queue(QueueName=QUEUE_NAME).get("QueueUrl") demo_queue_arn = sqs.get_queue_attributes(QueueUrl=demo_queue_url, AttributeNames=["QueueArn"])["Attributes"].get("QueueArn") # Set the RawMessageDelivery subscription attribute to TRUE sns_extended_client.subscribe(TopicArn=demo_topic_arn, Protocol="sqs", Endpoint=demo_queue_arn, Attributes={"RawMessageDelivery":"true"}) sns_extended_client.large_payload_support = s3_extended_payload_bucket # To store all messages content in S3, set always_through_s3 to True # In the example, we set message size threshold as 32 bytes, adjust this threshold as per your usecase # Message will only be uploaded to S3 when its payload size exceeded threshold sns_extended_client.message_size_threshold = 32 sns_extended_client.publish( TopicArn=demo_topic_arn, Message="This message should be published to S3 as it exceeds the message_size_threshold limit", ) # Print message stored in s3 fetch_and_print_from_sqs(sqs, demo_queue_url)

輸出

Published Message: [ "software.amazon.payloadoffloading.PayloadS3Pointer", { "s3BucketName": "extended-client-bucket-store", "s3Key": "xxxx-xxxxx-xxxxx-xxxxxx" } ] Message Stored in S3 Bucket is: This message should be published to S3 as it exceeds the message_size_threshold limit