기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
사전 조건
다음은 Python용 Amazon SNS 확장 클라이언트 라이브러리
-
AWS SDK. 이 페이지의 예제에서는 AWS Python SDK Boto3를 사용합니다. SDK를 설치하고 설정하려면 Python용AWS SDK
설명서를 참조하세요. -
적절한 자격 증명이 AWS 계정 있는 . 를 생성하려면 AWS 홈 페이지로
AWS 계정이동한 다음 AWS 계정 생성을 선택합니다. 지침을 따릅니다. 자격 증명에 대한 자세한 내용은 Python용AWS SDK 개발자 안내서에서 자격 증명
을 참조하세요. -
Python 3.x (또는 이후 버전) 및 pip.
-
Python용 Amazon SNS 확장 클라이언트 라이브러리(PyPI
에서도 사용 가능).
메시지 스토리지 구성
아래 속성은 Boto3 Amazon SNS 클라이언트
-
large_payload_support
- 대용량 메시지를 저장할 Amazon S3 버킷 이름입니다. -
use_legacy_attribute
- 인 경우 게시된True
모든 메시지는 현재 예약된 메시지 속성(SQSLargePayloadSize
) 대신 레거시 예약 메시지 속성()을 사용합니다ExtendedPayloadSize
. -
message_size_threshold
— 대용량 메시지 버킷에 메시지를 저장하기 위한 임계값입니다. 보다 작0
거나 보다 클 수 없습니다262144
. 기본값은262144
입니다. -
always_through_s3
—True
인 경우 모든 메시지가 Amazon S3에 저장됩니다. 기본값은False
입니다. -
s3_client
- Amazon S3에client
객체를 저장하는 데 사용할 Boto3 Amazon S3 객체입니다. Amazon S3 클라이언트(예: 사용자 지정 Amazon S3 구성 또는 자격 증명)를 제어하려는 경우이 옵션을 사용합니다. 이전에 설정하지 않은 경우 처음 사용할boto3.client("s3")
때 기본값은 입니다.
예: Amazon S3에 저장된 페이로드로 Amazon SNS에 메시지 게시
다음 코드 예제는 다음과 같은 작업을 수행하는 방법을 보여줍니다.
-
샘플 Amazon SNS 주제 및 Amazon SQS 대기열을 생성합니다.
-
정책을 Amazon SQS 대기열에 연결하여 Amazon SNS 주제에서 메시지를 수신합니다.
-
대기열에서 구독하여 주제의 메시지를 수신합니다.
-
Amazon SNS 확장 클라이언트, 주제 리소스 및 PlatformEndpoint 리소스를 사용하여 테스트 메시지를 게시합니다.
-
메시지 페이로드는 Amazon S3에 저장되고 이에 대한 참조가 게시됩니다.
-
대기열에 게시된 메시지를 Amazon S3에서 검색된 원본 메시지와 함께 인쇄합니다.
대용량 메시지를 게시하려면 Python용 Amazon SNS 확장 클라이언트 라이브러리를 사용하세요. 보내는 메시지는 실제 메시지 내용이 포함된 Amazon S3 객체를 참조합니다.
import boto3
from sns_extended_client import SNSExtendedClientSession
from json import loads
s3_extended_payload_bucket = "extended-client-bucket-store" # S3 bucket with the given bucket name is a resource which is created and accessible with the given AWS credentials
TOPIC_NAME = "---TOPIC-NAME---"
QUEUE_NAME = "---QUEUE-NAME---"
def allow_sns_to_write_to_sqs(topicarn, queuearn):
policy_document = """{{
"Version":"2012-10-17",
"Statement":[
{{
"Sid":"MyPolicy",
"Effect":"Allow",
"Principal" : {{"AWS" : "*"}},
"Action":"SQS:SendMessage",
"Resource": "{}",
"Condition":{{
"ArnEquals":{{
"aws:SourceArn": "{}"
}}
}}
}}
]
}}""".format(queuearn, topicarn)
return policy_document
def get_msg_from_s3(body,sns_extended_client):
"""Handy Helper to fetch message from S3"""
json_msg = loads(body)
s3_object = sns_extended_client.s3_client.get_object(
Bucket=json_msg[1].get("s3BucketName"), Key=json_msg[1].get("s3Key")
)
msg = s3_object.get("Body").read().decode()
return msg
def fetch_and_print_from_sqs(sqs, queue_url,sns_extended_client):
sqs_msg = sqs.receive_message(
QueueUrl=queue_url,
AttributeNames=['All'],
MessageAttributeNames=['All'],
VisibilityTimeout=0,
WaitTimeSeconds=0,
MaxNumberOfMessages=1
).get("Messages")[0]
message_body = sqs_msg.get("Body")
print("Published Message: {}".format(message_body))
print("Message Stored in S3 Bucket is: {}\n".format(get_msg_from_s3(message_body,sns_extended_client)))
# Delete the Processed Message
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=sqs_msg['ReceiptHandle']
)
sns_extended_client = boto3.client("sns", region_name="us-east-1")
create_topic_response = sns_extended_client.create_topic(Name=TOPIC_NAME)
sns_topic_arn = create_topic_response.get("TopicArn")
# create and subscribe an sqs queue to the sns client
sqs = boto3.client("sqs",region_name="us-east-1")
demo_queue_url = sqs.create_queue(QueueName=QUEUE_NAME).get("QueueUrl")
sqs_queue_arn = sqs.get_queue_attributes(
QueueUrl=demo_queue_url, AttributeNames=["QueueArn"]
)["Attributes"].get("QueueArn")
# Adding policy to SQS queue such that SNS topic can send msg to SQS queue
policy_json = allow_sns_to_write_to_sqs(sns_topic_arn, sqs_queue_arn)
response = sqs.set_queue_attributes(
QueueUrl = demo_queue_url,
Attributes = {
'Policy' : policy_json
}
)
# Set the RawMessageDelivery subscription attribute to TRUE if you want to use
# SQSExtendedClient to help with retrieving msg from S3
sns_extended_client.subscribe(TopicArn=sns_topic_arn, Protocol="sqs",
Endpoint=sqs_queue_arn
, Attributes={"RawMessageDelivery":"true"}
)
sns_extended_client.large_payload_support = s3_extended_payload_bucket
# Change default s3_client attribute of sns_extended_client to use 'us-east-1' region
sns_extended_client.s3_client = boto3.client("s3", region_name="us-east-1")
# Below is the example that all the messages will be sent to the S3 bucket
sns_extended_client.always_through_s3 = True
sns_extended_client.publish(
TopicArn=sns_topic_arn, Message="This message should be published to S3"
)
print("\n\nPublished using SNS extended client:")
fetch_and_print_from_sqs(sqs, demo_queue_url,sns_extended_client) # Prints message stored in s3
# Below is the example that all the messages larger than 32 bytes will be sent to the S3 bucket
print("\nUsing decreased message size threshold:")
sns_extended_client.always_through_s3 = False
sns_extended_client.message_size_threshold = 32
sns_extended_client.publish(
TopicArn=sns_topic_arn,
Message="This message should be published to S3 as it exceeds the limit of the 32 bytes",
)
fetch_and_print_from_sqs(sqs, demo_queue_url,sns_extended_client) # Prints message stored in s3
# Below is the example to publish message using the SNS.Topic resource
sns_extended_client_resource = SNSExtendedClientSession().resource(
"sns", region_name="us-east-1"
)
topic = sns_extended_client_resource.Topic(sns_topic_arn)
topic.large_payload_support = s3_extended_payload_bucket
# Change default s3_client attribute of topic to use 'us-east-1' region
topic.s3_client = boto3.client("s3", region_name="us-east-1")
topic.always_through_s3 = True
# Can Set custom S3 Keys to be used to store objects in S3
topic.publish(
Message="This message should be published to S3 using the topic resource",
MessageAttributes={
"S3Key": {
"DataType": "String",
"StringValue": "347c11c4-a22c-42e4-a6a2-9b5af5b76587",
}
},
)
print("\nPublished using Topic Resource:")
fetch_and_print_from_sqs(sqs, demo_queue_url,topic)
# Below is the example to publish message using the SNS.PlatformEndpoint resource
sns_extended_client_resource = SNSExtendedClientSession().resource(
"sns", region_name="us-east-1"
)
platform_endpoint = sns_extended_client_resource.PlatformEndpoint(sns_topic_arn)
platform_endpoint.large_payload_support = s3_extended_payload_bucket
# Change default s3_client attribute of platform_endpoint to use 'us-east-1' region
platform_endpoint.s3_client = boto3.client("s3", region_name="us-east-1")
platform_endpoint.always_through_s3 = True
# Can Set custom S3 Keys to be used to store objects in S3
platform_endpoint.publish(
Message="This message should be published to S3 using the PlatformEndpoint resource",
MessageAttributes={
"S3Key": {
"DataType": "String",
"StringValue": "247c11c4-a22c-42e4-a6a2-9b5af5b76587",
}
},
)
print("\nPublished using PlatformEndpoint Resource:")
fetch_and_print_from_sqs(sqs, demo_queue_url,platform_endpoint)
출력
Published using SNS extended client:
Published Message: ["software.amazon.payloadoffloading.PayloadS3Pointer", {"s3BucketName": "extended-client-bucket-store", "s3Key": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"}]
Message Stored in S3 Bucket is: This message should be published to S3
Using decreased message size threshold:
Published Message: ["software.amazon.payloadoffloading.PayloadS3Pointer", {"s3BucketName": "extended-client-bucket-store", "s3Key": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"}]
Message Stored in S3 Bucket is: This message should be published to S3 as it exceeds the limit of the 32 bytes
Published using Topic Resource:
Published Message: ["software.amazon.payloadoffloading.PayloadS3Pointer", {"s3BucketName": "extended-client-bucket-store", "s3Key": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"}]
Message Stored in S3 Bucket is: This message should be published to S3 using the topic resource
Published using PlatformEndpoint Resource:
Published Message: ["software.amazon.payloadoffloading.PayloadS3Pointer", {"s3BucketName": "extended-client-bucket-store", "s3Key": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"}]
Message Stored in S3 Bucket is: This message should be published to S3 using the PlatformEndpoint resource