Word for Python(Boto3)을 사용한 Amazon SDK SQS 예제 - AWS SDK 코드 예제

AWS Doc SDK ExamplesWord AWS SDK 리포지토리에는 더 많은 GitHub 예제가 있습니다.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Word for Python(Boto3)을 사용한 Amazon SDK SQS 예제

다음 코드 예제에서는 Amazon SQS AWS SDK for Python (Boto3) 와 함께를 사용하여 작업을 수행하고 일반적인 시나리오를 구현하는 방법을 보여줍니다.

작업은 대규모 프로그램에서 발췌한 코드이며 컨텍스트에 맞춰 실행해야 합니다. 작업은 개별 서비스 함수를 직접적으로 호출하는 방법을 보여주며 관련 시나리오의 컨텍스트에 맞는 작업을 볼 수 있습니다.

시나리오는 동일한 서비스 내에서 또는 다른 AWS 서비스와 결합된 상태에서 여러 함수를 호출하여 특정 태스크를 수행하는 방법을 보여주는 코드 예제입니다.

각 예제에는 컨텍스트에서 코드를 설정하고 실행하는 방법에 대한 지침을 찾을 수 있는 전체 소스 코드에 대한 링크가 포함되어 있습니다.

작업

다음 코드 예시에서는 CreateQueue을 사용하는 방법을 보여 줍니다.

Python용 SDK(Boto3)
참고

더 많은 on GitHub가 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

def create_queue(name, attributes=None): """ Creates an Amazon SQS queue. :param name: The name of the queue. This is part of the URL assigned to the queue. :param attributes: The attributes of the queue, such as maximum message size or whether it's a FIFO queue. :return: A Queue object that contains metadata about the queue and that can be used to perform queue operations like sending and receiving messages. """ if not attributes: attributes = {} try: queue = sqs.create_queue(QueueName=name, Attributes=attributes) logger.info("Created queue '%s' with URL=%s", name, queue.url) except ClientError as error: logger.exception("Couldn't create queue named '%s'.", name) raise error else: return queue
  • API 세부 정보는 Word for Python(Boto3) CreateQueue 참조의 Word를 참조하세요. AWS SDK API

다음 코드 예시에서는 DeleteMessage을 사용하는 방법을 보여 줍니다.

Python용 SDK(Boto3)
참고

더 많은 on GitHub가 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

def delete_message(message): """ Delete a message from a queue. Clients must delete messages after they are received and processed to remove them from the queue. :param message: The message to delete. The message's queue URL is contained in the message's metadata. :return: None """ try: message.delete() logger.info("Deleted message: %s", message.message_id) except ClientError as error: logger.exception("Couldn't delete message: %s", message.message_id) raise error
  • API 세부 정보는 Word for Python(Boto3) DeleteMessage 참조의 Word를 참조하세요. AWS SDK API

다음 코드 예시에서는 DeleteMessageBatch을 사용하는 방법을 보여 줍니다.

Python용 SDK(Boto3)
참고

더 많은 on GitHub가 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

def delete_messages(queue, messages): """ Delete a batch of messages from a queue in a single request. :param queue: The queue from which to delete the messages. :param messages: The list of messages to delete. :return: The response from SQS that contains the list of successful and failed message deletions. """ try: entries = [ {"Id": str(ind), "ReceiptHandle": msg.receipt_handle} for ind, msg in enumerate(messages) ] response = queue.delete_messages(Entries=entries) if "Successful" in response: for msg_meta in response["Successful"]: logger.info("Deleted %s", messages[int(msg_meta["Id"])].receipt_handle) if "Failed" in response: for msg_meta in response["Failed"]: logger.warning( "Could not delete %s", messages[int(msg_meta["Id"])].receipt_handle ) except ClientError: logger.exception("Couldn't delete messages from queue %s", queue) else: return response
  • API 세부 정보는 Word for Python(Boto3) DeleteMessageBatch 참조의 Word를 참조하세요. AWS SDK API

다음 코드 예시에서는 DeleteQueue을 사용하는 방법을 보여 줍니다.

Python용 SDK(Boto3)
참고

더 많은 on GitHub가 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

def remove_queue(queue): """ Removes an SQS queue. When run against an AWS account, it can take up to 60 seconds before the queue is actually deleted. :param queue: The queue to delete. :return: None """ try: queue.delete() logger.info("Deleted queue with URL=%s.", queue.url) except ClientError as error: logger.exception("Couldn't delete queue with URL=%s!", queue.url) raise error
  • API 세부 정보는 Word for Python(Boto3) DeleteQueue 참조의 Word를 참조하세요. AWS SDK API

다음 코드 예시에서는 GetQueueUrl을 사용하는 방법을 보여 줍니다.

Python용 SDK(Boto3)
참고

더 많은 on GitHub가 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

def get_queue(name): """ Gets an SQS queue by name. :param name: The name that was used to create the queue. :return: A Queue object. """ try: queue = sqs.get_queue_by_name(QueueName=name) logger.info("Got queue '%s' with URL=%s", name, queue.url) except ClientError as error: logger.exception("Couldn't get queue named %s.", name) raise error else: return queue
  • API 세부 정보는 Word for Python(Boto3) GetQueueUrl 참조의 Word를 참조하세요. AWS SDK API

다음 코드 예시에서는 ListQueues을 사용하는 방법을 보여 줍니다.

Python용 SDK(Boto3)
참고

더 많은 on GitHub가 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

def get_queues(prefix=None): """ Gets a list of SQS queues. When a prefix is specified, only queues with names that start with the prefix are returned. :param prefix: The prefix used to restrict the list of returned queues. :return: A list of Queue objects. """ if prefix: queue_iter = sqs.queues.filter(QueueNamePrefix=prefix) else: queue_iter = sqs.queues.all() queues = list(queue_iter) if queues: logger.info("Got queues: %s", ", ".join([q.url for q in queues])) else: logger.warning("No queues found.") return queues
  • API 세부 정보는 Word for Python(Boto3) ListQueues 참조의 Word를 참조하세요. AWS SDK API

다음 코드 예시에서는 ReceiveMessage을 사용하는 방법을 보여 줍니다.

Python용 SDK(Boto3)
참고

더 많은 on GitHub가 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

def receive_messages(queue, max_number, wait_time): """ Receive a batch of messages in a single request from an SQS queue. :param queue: The queue from which to receive messages. :param max_number: The maximum number of messages to receive. The actual number of messages received might be less. :param wait_time: The maximum time to wait (in seconds) before returning. When this number is greater than zero, long polling is used. This can result in reduced costs and fewer false empty responses. :return: The list of Message objects received. These each contain the body of the message and metadata and custom attributes. """ try: messages = queue.receive_messages( MessageAttributeNames=["All"], MaxNumberOfMessages=max_number, WaitTimeSeconds=wait_time, ) for msg in messages: logger.info("Received message: %s: %s", msg.message_id, msg.body) except ClientError as error: logger.exception("Couldn't receive messages from queue: %s", queue) raise error else: return messages
  • API 세부 정보는 Word for Python(Boto3) ReceiveMessage 참조의 Word를 참조하세요. AWS SDK API

다음 코드 예시에서는 SendMessage을 사용하는 방법을 보여 줍니다.

Python용 SDK(Boto3)
참고

더 많은 on GitHub가 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

def send_message(queue, message_body, message_attributes=None): """ Send a message to an Amazon SQS queue. :param queue: The queue that receives the message. :param message_body: The body text of the message. :param message_attributes: Custom attributes of the message. These are key-value pairs that can be whatever you want. :return: The response from SQS that contains the assigned message ID. """ if not message_attributes: message_attributes = {} try: response = queue.send_message( MessageBody=message_body, MessageAttributes=message_attributes ) except ClientError as error: logger.exception("Send message failed: %s", message_body) raise error else: return response
  • API 세부 정보는 Word for Python(Boto3) SendMessage 참조의 Word를 참조하세요. AWS SDK API

다음 코드 예시에서는 SendMessageBatch을 사용하는 방법을 보여 줍니다.

Python용 SDK(Boto3)
참고

더 많은 on GitHub가 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

def send_messages(queue, messages): """ Send a batch of messages in a single request to an SQS queue. This request may return overall success even when some messages were not sent. The caller must inspect the Successful and Failed lists in the response and resend any failed messages. :param queue: The queue to receive the messages. :param messages: The messages to send to the queue. These are simplified to contain only the message body and attributes. :return: The response from SQS that contains the list of successful and failed messages. """ try: entries = [ { "Id": str(ind), "MessageBody": msg["body"], "MessageAttributes": msg["attributes"], } for ind, msg in enumerate(messages) ] response = queue.send_messages(Entries=entries) if "Successful" in response: for msg_meta in response["Successful"]: logger.info( "Message sent: %s: %s", msg_meta["MessageId"], messages[int(msg_meta["Id"])]["body"], ) if "Failed" in response: for msg_meta in response["Failed"]: logger.warning( "Failed to send: %s: %s", msg_meta["MessageId"], messages[int(msg_meta["Id"])]["body"], ) except ClientError as error: logger.exception("Send messages failed to queue: %s", queue) raise error else: return response
  • API 세부 정보는 Word for Python(Boto3) SendMessageBatch 참조의 Word를 참조하세요. AWS SDK API

시나리오

다음 코드 예제는 데이터베이스 테이블에서 메시지 레코드를 검색하는 AWS Step Functions 메신저 애플리케이션을 생성하는 방법을 보여줍니다.

Python용 SDK(Boto3)

AWS SDK for Python (Boto3) 와를 사용하여 Amazon DynamoDB 테이블에서 메시지 레코드를 검색하고 Amazon Simple Queue Service(Amazon SQS)와 함께 보내는 메신저 애플리케이션을 AWS Step Functions 생성하는 방법을 보여줍니다. 상태 시스템은 AWS Lambda 함수와 통합되어 데이터베이스에 전송되지 않은 메시지가 있는지 스캔합니다.

  • Amazon DynamoDB 테이블에서 메시지 레코드를 검색하고 업데이트하는 상태 머신을 생성합니다.

  • 상태 시스템 정의를 업데이트하여 Amazon Simple Queue Service(Amazon SQS)로 메시지도 전송합니다.

  • 상태 머신의 실행을 시작하고 중지합니다.

  • 서비스 통합을 사용하여 상태 시스템에서 Lambda, DynamoDB 및 Amazon SQS에 연결합니다.

전체 소스 코드와 설정 및 실행 방법에 대한 지침은 GitHub의 전체 예제를 참조하세요.

이 예제에서 사용되는 서비스
  • DynamoDB

  • Lambda

  • Amazon SQS

  • Step Functions

다음 코드 예제에서는 대화형 애플리케이션을 통해 Amazon Textract 출력을 탐색하는 방법을 보여줍니다.

Python용 SDK(Boto3)

Amazon Textract AWS SDK for Python (Boto3) 에서를 사용하여 문서 이미지의 텍스트, 양식 및 테이블 요소를 감지하는 방법을 보여줍니다. 입력 이미지와 Amazon Textract 출력은 탐지된 요소를 탐색할 수 있는 Tkinter 애플리케이션에 표시됩니다.

  • 문서 이미지를 Amazon Textract에 제출하고 감지된 요소의 출력을 탐색합니다.

  • Amazon Textract로 직접, 또는 Amazon Simple Storage Service (Amazon S3) 버킷을 통해 이미지를 제출합니다.

  • 비동기 APIs를 사용하여 작업이 완료되면 Amazon Simple Notification Service(Amazon SNS) 주제에 알림을 게시하는 작업을 시작합니다.

  • 작업 완료 메시지에 대해 Amazon Simple Queue Service(Amazon SQS) 대기열을 폴링하고 결과를 표시합니다.

전체 소스 코드와 설정 및 실행 방법에 대한 지침은 GitHub의 전체 예제를 참조하세요.

이 예시에서 사용되는 서비스
  • Amazon S3

  • Amazon SNS

  • Amazon SQS

  • Amazon Textract

다음 코드 예제는 FIFO Amazon SNS 주제를 생성하고 게시하는 방법을 보여줍니다.

Python용 SDK(Boto3)
참고

더 많은 on GitHub가 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

Amazon SNS FIFO 주제를 생성하고, Amazon SQS FIFO 및 주제의 표준 대기열을 구독하고, 주제에 메시지를 게시합니다.

def usage_demo(): """Shows how to subscribe queues to a FIFO topic.""" print("-" * 88) print("Welcome to the `Subscribe queues to a FIFO topic` demo!") print("-" * 88) sns = boto3.resource("sns") sqs = boto3.resource("sqs") fifo_topic_wrapper = FifoTopicWrapper(sns) sns_wrapper = SnsWrapper(sns) prefix = "sqs-subscribe-demo-" queues = set() subscriptions = set() wholesale_queue = sqs.create_queue( QueueName=prefix + "wholesale.fifo", Attributes={ "MaximumMessageSize": str(4096), "ReceiveMessageWaitTimeSeconds": str(10), "VisibilityTimeout": str(300), "FifoQueue": str(True), "ContentBasedDeduplication": str(True), }, ) queues.add(wholesale_queue) print(f"Created FIFO queue with URL: {wholesale_queue.url}.") retail_queue = sqs.create_queue( QueueName=prefix + "retail.fifo", Attributes={ "MaximumMessageSize": str(4096), "ReceiveMessageWaitTimeSeconds": str(10), "VisibilityTimeout": str(300), "FifoQueue": str(True), "ContentBasedDeduplication": str(True), }, ) queues.add(retail_queue) print(f"Created FIFO queue with URL: {retail_queue.url}.") analytics_queue = sqs.create_queue(QueueName=prefix + "analytics", Attributes={}) queues.add(analytics_queue) print(f"Created standard queue with URL: {analytics_queue.url}.") topic = fifo_topic_wrapper.create_fifo_topic("price-updates-topic.fifo") print(f"Created FIFO topic: {topic.attributes['TopicArn']}.") for q in queues: fifo_topic_wrapper.add_access_policy(q, topic.attributes["TopicArn"]) print(f"Added access policies for topic: {topic.attributes['TopicArn']}.") for q in queues: sub = fifo_topic_wrapper.subscribe_queue_to_topic( topic, q.attributes["QueueArn"] ) subscriptions.add(sub) print(f"Subscribed queues to topic: {topic.attributes['TopicArn']}.") input("Press Enter to publish a message to the topic.") message_id = fifo_topic_wrapper.publish_price_update( topic, '{"product": 214, "price": 79.99}', "Consumables" ) print(f"Published price update with message ID: {message_id}.") # Clean up the subscriptions, queues, and topic. input("Press Enter to clean up resources.") for s in subscriptions: sns_wrapper.delete_subscription(s) sns_wrapper.delete_topic(topic) for q in queues: fifo_topic_wrapper.delete_queue(q) print(f"Deleted subscriptions, queues, and topic.") print("Thanks for watching!") print("-" * 88) class FifoTopicWrapper: """Encapsulates Amazon SNS FIFO topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource def create_fifo_topic(self, topic_name): """ Create a FIFO topic. Topic names must be made up of only uppercase and lowercase ASCII letters, numbers, underscores, and hyphens, and must be between 1 and 256 characters long. For a FIFO topic, the name must end with the .fifo suffix. :param topic_name: The name for the topic. :return: The new topic. """ try: topic = self.sns_resource.create_topic( Name=topic_name, Attributes={ "FifoTopic": str(True), "ContentBasedDeduplication": str(False), }, ) logger.info("Created FIFO topic with name=%s.", topic_name) return topic except ClientError as error: logger.exception("Couldn't create topic with name=%s!", topic_name) raise error @staticmethod def add_access_policy(queue, topic_arn): """ Add the necessary access policy to a queue, so it can receive messages from a topic. :param queue: The queue resource. :param topic_arn: The ARN of the topic. :return: None. """ try: queue.set_attributes( Attributes={ "Policy": json.dumps( { "Version": "2012-10-17", "Statement": [ { "Sid": "test-sid", "Effect": "Allow", "Principal": {"AWS": "*"}, "Action": "SQS:SendMessage", "Resource": queue.attributes["QueueArn"], "Condition": { "ArnLike": {"aws:SourceArn": topic_arn} }, } ], } ) } ) logger.info("Added trust policy to the queue.") except ClientError as error: logger.exception("Couldn't add trust policy to the queue!") raise error @staticmethod def subscribe_queue_to_topic(topic, queue_arn): """ Subscribe a queue to a topic. :param topic: The topic resource. :param queue_arn: The ARN of the queue. :return: The subscription resource. """ try: subscription = topic.subscribe( Protocol="sqs", Endpoint=queue_arn, ) logger.info("The queue is subscribed to the topic.") return subscription except ClientError as error: logger.exception("Couldn't subscribe queue to topic!") raise error @staticmethod def publish_price_update(topic, payload, group_id): """ Compose and publish a message that updates the wholesale price. :param topic: The topic to publish to. :param payload: The message to publish. :param group_id: The group ID for the message. :return: The ID of the message. """ try: att_dict = {"business": {"DataType": "String", "StringValue": "wholesale"}} dedup_id = uuid.uuid4() response = topic.publish( Subject="Price Update", Message=payload, MessageAttributes=att_dict, MessageGroupId=group_id, MessageDeduplicationId=str(dedup_id), ) message_id = response["MessageId"] logger.info("Published message to topic %s.", topic.arn) except ClientError as error: logger.exception("Couldn't publish message to topic %s.", topic.arn) raise error return message_id @staticmethod def delete_queue(queue): """ Removes an SQS queue. When run against an AWS account, it can take up to 60 seconds before the queue is actually deleted. :param queue: The queue to delete. :return: None """ try: queue.delete() logger.info("Deleted queue with URL=%s.", queue.url) except ClientError as error: logger.exception("Couldn't delete queue with URL=%s!", queue.url) raise error

다음 코드 예시는 다음과 같은 작업을 수행하는 방법을 보여줍니다.

  • Amazon SQS 대기열을 생성합니다.

  • 대기열에 메시지를 일괄 전송합니다.

  • 대기열에서 메시지를 일괄 수신합니다.

  • 대기열에서 메시지 배치를 삭제합니다.

Python용 SDK(Boto3)
참고

더 많은 on GitHub가 있습니다. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

Amazon SQS 메시지 함수를 래핑하는 함수를 생성합니다.

import logging import sys import boto3 from botocore.exceptions import ClientError import queue_wrapper logger = logging.getLogger(__name__) sqs = boto3.resource("sqs") def send_messages(queue, messages): """ Send a batch of messages in a single request to an SQS queue. This request may return overall success even when some messages were not sent. The caller must inspect the Successful and Failed lists in the response and resend any failed messages. :param queue: The queue to receive the messages. :param messages: The messages to send to the queue. These are simplified to contain only the message body and attributes. :return: The response from SQS that contains the list of successful and failed messages. """ try: entries = [ { "Id": str(ind), "MessageBody": msg["body"], "MessageAttributes": msg["attributes"], } for ind, msg in enumerate(messages) ] response = queue.send_messages(Entries=entries) if "Successful" in response: for msg_meta in response["Successful"]: logger.info( "Message sent: %s: %s", msg_meta["MessageId"], messages[int(msg_meta["Id"])]["body"], ) if "Failed" in response: for msg_meta in response["Failed"]: logger.warning( "Failed to send: %s: %s", msg_meta["MessageId"], messages[int(msg_meta["Id"])]["body"], ) except ClientError as error: logger.exception("Send messages failed to queue: %s", queue) raise error else: return response def receive_messages(queue, max_number, wait_time): """ Receive a batch of messages in a single request from an SQS queue. :param queue: The queue from which to receive messages. :param max_number: The maximum number of messages to receive. The actual number of messages received might be less. :param wait_time: The maximum time to wait (in seconds) before returning. When this number is greater than zero, long polling is used. This can result in reduced costs and fewer false empty responses. :return: The list of Message objects received. These each contain the body of the message and metadata and custom attributes. """ try: messages = queue.receive_messages( MessageAttributeNames=["All"], MaxNumberOfMessages=max_number, WaitTimeSeconds=wait_time, ) for msg in messages: logger.info("Received message: %s: %s", msg.message_id, msg.body) except ClientError as error: logger.exception("Couldn't receive messages from queue: %s", queue) raise error else: return messages def delete_messages(queue, messages): """ Delete a batch of messages from a queue in a single request. :param queue: The queue from which to delete the messages. :param messages: The list of messages to delete. :return: The response from SQS that contains the list of successful and failed message deletions. """ try: entries = [ {"Id": str(ind), "ReceiptHandle": msg.receipt_handle} for ind, msg in enumerate(messages) ] response = queue.delete_messages(Entries=entries) if "Successful" in response: for msg_meta in response["Successful"]: logger.info("Deleted %s", messages[int(msg_meta["Id"])].receipt_handle) if "Failed" in response: for msg_meta in response["Failed"]: logger.warning( "Could not delete %s", messages[int(msg_meta["Id"])].receipt_handle ) except ClientError: logger.exception("Couldn't delete messages from queue %s", queue) else: return response

래퍼 함수를 사용하여 메시지를 일괄적으로 보내고 받을 수 있습니다.

def usage_demo(): """ Shows how to: * Read the lines from this Python file and send the lines in batches of 10 as messages to a queue. * Receive the messages in batches until the queue is empty. * Reassemble the lines of the file and verify they match the original file. """ def pack_message(msg_path, msg_body, msg_line): return { "body": msg_body, "attributes": { "path": {"StringValue": msg_path, "DataType": "String"}, "line": {"StringValue": str(msg_line), "DataType": "String"}, }, } def unpack_message(msg): return ( msg.message_attributes["path"]["StringValue"], msg.body, int(msg.message_attributes["line"]["StringValue"]), ) print("-" * 88) print("Welcome to the Amazon Simple Queue Service (Amazon SQS) demo!") print("-" * 88) queue = queue_wrapper.create_queue("sqs-usage-demo-message-wrapper") with open(__file__) as file: lines = file.readlines() line = 0 batch_size = 10 received_lines = [None] * len(lines) print(f"Sending file lines in batches of {batch_size} as messages.") while line < len(lines): messages = [ pack_message(__file__, lines[index], index) for index in range(line, min(line + batch_size, len(lines))) ] line = line + batch_size send_messages(queue, messages) print(".", end="") sys.stdout.flush() print(f"Done. Sent {len(lines) - 1} messages.") print(f"Receiving, handling, and deleting messages in batches of {batch_size}.") more_messages = True while more_messages: received_messages = receive_messages(queue, batch_size, 2) print(".", end="") sys.stdout.flush() for message in received_messages: path, body, line = unpack_message(message) received_lines[line] = body if received_messages: delete_messages(queue, received_messages) else: more_messages = False print("Done.") if all([lines[index] == received_lines[index] for index in range(len(lines))]): print(f"Successfully reassembled all file lines!") else: print(f"Uh oh, some lines were missed!") queue.delete() print("Thanks for watching!") print("-" * 88)

서버리스 예제

다음 코드 예제는 SQS 대기열에서 메시지를 수신하여 트리거된 이벤트를 수신하는 Lambda 함수를 구현하는 방법을 보여줍니다. 함수는 이벤트 파라미터에서 메시지를 검색하고 각 메시지의 내용을 로깅합니다.

Python용 SDK(Boto3)
참고

더 많은 on GitHub가 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

Python을 사용하여 Lambda로 SQS 이벤트 사용.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def lambda_handler(event, context): for message in event['Records']: process_message(message) print("done") def process_message(message): try: print(f"Processed message {message['body']}") # TODO: Do interesting work based on the new message except Exception as err: print("An error occurred") raise err

다음 코드 예제는 SQS 대기열에서 이벤트를 수신하는 Lambda 함수에 대해 부분 배치 응답을 구현하는 방법을 보여줍니다. 이 함수는 응답으로 배치 항목 실패를 보고하고 나중에 해당 메시지를 다시 시도하도록 Lambda에 신호를 보냅니다.

Python용 SDK(Boto3)
참고

더 많은 on GitHub가 있습니다. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

Python을 사용하여 Lambda로 SQS 배치 항목 실패를 보고합니다.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def lambda_handler(event, context): if event: batch_item_failures = [] sqs_batch_response = {} for record in event["Records"]: try: # process message except Exception as e: batch_item_failures.append({"itemIdentifier": record['messageId']}) sqs_batch_response["batchItemFailures"] = batch_item_failures return sqs_batch_response