Ejemplos de código para FIFO temas - Amazon Simple Notification Service

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Ejemplos de código para FIFO temas

Puede usar los siguientes ejemplos de código para integrar el caso práctico de administración de precios de autopartes utilizando un SNS FIFO tema de Amazon con una SQS FIFO cola de Amazon o una cola estándar.

Uso de una AWS SDK

Con un AWS SDK, se crea un SNS FIFO tema de Amazon estableciendo su FifoTopic atributo entrue. Para crear una SQS FIFO cola de Amazon, defina true su FifoQueue atributo en. Además, debe añadir el .fifo sufijo al nombre de cada FIFO recurso. Después de crear un FIFO tema o una cola, no puede convertirlo en un tema o una cola estándar.

Los siguientes ejemplos de código crean estos recursos de cola FIFO y los estándares:

  • El SNS FIFO tema de Amazon que distribuye las actualizaciones de precios

  • Las SQS FIFO colas de Amazon que proporcionan estas actualizaciones a las aplicaciones de venta mayorista y minorista

  • La cola SQS estándar de Amazon para la aplicación de análisis que almacena registros, que se pueden consultar para la inteligencia empresarial (BI)

  • SNSFIFOLas suscripciones de Amazon que conectan las tres colas con el tema

En este ejemplo, se establecen las Políticas de filtrado en las suscripciones. Si prueba el ejemplo al publicar un mensaje en el tema, asegúrese de publicar el mensaje con el atributo de business. Especifique retail o wholesale en el valor de atributo. De lo contrario, el mensaje se filtra y no se entrega a las colas suscritas. Para obtener más información, consulte Filtrado de mensajes por FIFO temas.

Java
SDKpara Java 2.x
nota

Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS.

Este ejemplo

  • crea un SNS FIFO tema de Amazon, dos SQS FIFO colas de Amazon y una cola estándar.

  • suscribe las colas al tema y publica un mensaje en el tema.

La prueba verifica la recepción del mensaje en cada cola. El ejemplo completo también muestra la adición de políticas de acceso y, al final, elimina los recursos.

public class PriceUpdateExample { public final static SnsClient snsClient = SnsClient.create(); public final static SqsClient sqsClient = SqsClient.create(); public static void main(String[] args) { final String usage = "\n" + "Usage: " + " <topicName> <wholesaleQueueFifoName> <retailQueueFifoName> <analyticsQueueName>\n\n" + "Where:\n" + " fifoTopicName - The name of the FIFO topic that you want to create. \n\n" + " wholesaleQueueARN - The name of a SQS FIFO queue that will be created for the wholesale consumer. \n\n" + " retailQueueARN - The name of a SQS FIFO queue that will created for the retail consumer. \n\n" + " analyticsQueueARN - The name of a SQS standard queue that will be created for the analytics consumer. \n\n"; if (args.length != 4) { System.out.println(usage); System.exit(1); } final String fifoTopicName = args[0]; final String wholeSaleQueueName = args[1]; final String retailQueueName = args[2]; final String analyticsQueueName = args[3]; // For convenience, the QueueData class holds metadata about a queue: ARN, URL, // name and type. List<QueueData> queues = List.of( new QueueData(wholeSaleQueueName, QueueType.FIFO), new QueueData(retailQueueName, QueueType.FIFO), new QueueData(analyticsQueueName, QueueType.Standard)); // Create queues. createQueues(queues); // Create a topic. String topicARN = createFIFOTopic(fifoTopicName); // Subscribe each queue to the topic. subscribeQueues(queues, topicARN); // Allow the newly created topic to send messages to the queues. addAccessPolicyToQueuesFINAL(queues, topicARN); // Publish a sample price update message with payload. publishPriceUpdate(topicARN, "{\"product\": 214, \"price\": 79.99}", "Consumables"); // Clean up resources. deleteSubscriptions(queues); deleteQueues(queues); deleteTopic(topicARN); } public static String createFIFOTopic(String topicName) { try { // Create a FIFO topic by using the SNS service client. Map<String, String> topicAttributes = Map.of( "FifoTopic", "true", "ContentBasedDeduplication", "false"); CreateTopicRequest topicRequest = CreateTopicRequest.builder() .name(topicName) .attributes(topicAttributes) .build(); CreateTopicResponse response = snsClient.createTopic(topicRequest); String topicArn = response.topicArn(); System.out.println("The topic ARN is" + topicArn); return topicArn; } catch (SnsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } return ""; } public static void subscribeQueues(List<QueueData> queues, String topicARN) { queues.forEach(queue -> { SubscribeRequest subscribeRequest = SubscribeRequest.builder() .topicArn(topicARN) .endpoint(queue.queueARN) .protocol("sqs") .build(); // Subscribe to the endpoint by using the SNS service client. // Only Amazon SQS queues can receive notifications from an Amazon SNS FIFO // topic. SubscribeResponse subscribeResponse = snsClient.subscribe(subscribeRequest); System.out.println("The queue [" + queue.queueARN + "] subscribed to the topic [" + topicARN + "]"); queue.subscriptionARN = subscribeResponse.subscriptionArn(); }); } public static void publishPriceUpdate(String topicArn, String payload, String groupId) { try { // Create and publish a message that updates the wholesale price. String subject = "Price Update"; String dedupId = UUID.randomUUID().toString(); String attributeName = "business"; String attributeValue = "wholesale"; MessageAttributeValue msgAttValue = MessageAttributeValue.builder() .dataType("String") .stringValue(attributeValue) .build(); Map<String, MessageAttributeValue> attributes = new HashMap<>(); attributes.put(attributeName, msgAttValue); PublishRequest pubRequest = PublishRequest.builder() .topicArn(topicArn) .subject(subject) .message(payload) .messageGroupId(groupId) .messageDeduplicationId(dedupId) .messageAttributes(attributes) .build(); final PublishResponse response = snsClient.publish(pubRequest); System.out.println(response.messageId()); System.out.println(response.sequenceNumber()); System.out.println("Message was published to " + topicArn); } catch (SnsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } }
Python
SDKpara Python (Boto3)
nota

Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS.

Crea un SNS FIFO tema de Amazon, suscríbete a Amazon SQS FIFO y a las colas estándar para el tema y publica un mensaje en el tema.

def usage_demo(): """Shows how to subscribe queues to a FIFO topic.""" print("-" * 88) print("Welcome to the `Subscribe queues to a FIFO topic` demo!") print("-" * 88) sns = boto3.resource("sns") sqs = boto3.resource("sqs") fifo_topic_wrapper = FifoTopicWrapper(sns) sns_wrapper = SnsWrapper(sns) prefix = "sqs-subscribe-demo-" queues = set() subscriptions = set() wholesale_queue = sqs.create_queue( QueueName=prefix + "wholesale.fifo", Attributes={ "MaximumMessageSize": str(4096), "ReceiveMessageWaitTimeSeconds": str(10), "VisibilityTimeout": str(300), "FifoQueue": str(True), "ContentBasedDeduplication": str(True), }, ) queues.add(wholesale_queue) print(f"Created FIFO queue with URL: {wholesale_queue.url}.") retail_queue = sqs.create_queue( QueueName=prefix + "retail.fifo", Attributes={ "MaximumMessageSize": str(4096), "ReceiveMessageWaitTimeSeconds": str(10), "VisibilityTimeout": str(300), "FifoQueue": str(True), "ContentBasedDeduplication": str(True), }, ) queues.add(retail_queue) print(f"Created FIFO queue with URL: {retail_queue.url}.") analytics_queue = sqs.create_queue(QueueName=prefix + "analytics", Attributes={}) queues.add(analytics_queue) print(f"Created standard queue with URL: {analytics_queue.url}.") topic = fifo_topic_wrapper.create_fifo_topic("price-updates-topic.fifo") print(f"Created FIFO topic: {topic.attributes['TopicArn']}.") for q in queues: fifo_topic_wrapper.add_access_policy(q, topic.attributes["TopicArn"]) print(f"Added access policies for topic: {topic.attributes['TopicArn']}.") for q in queues: sub = fifo_topic_wrapper.subscribe_queue_to_topic( topic, q.attributes["QueueArn"] ) subscriptions.add(sub) print(f"Subscribed queues to topic: {topic.attributes['TopicArn']}.") input("Press Enter to publish a message to the topic.") message_id = fifo_topic_wrapper.publish_price_update( topic, '{"product": 214, "price": 79.99}', "Consumables" ) print(f"Published price update with message ID: {message_id}.") # Clean up the subscriptions, queues, and topic. input("Press Enter to clean up resources.") for s in subscriptions: sns_wrapper.delete_subscription(s) sns_wrapper.delete_topic(topic) for q in queues: fifo_topic_wrapper.delete_queue(q) print(f"Deleted subscriptions, queues, and topic.") print("Thanks for watching!") print("-" * 88) class FifoTopicWrapper: """Encapsulates Amazon SNS FIFO topic and subscription functions.""" def __init__(self, sns_resource): """ :param sns_resource: A Boto3 Amazon SNS resource. """ self.sns_resource = sns_resource def create_fifo_topic(self, topic_name): """ Create a FIFO topic. Topic names must be made up of only uppercase and lowercase ASCII letters, numbers, underscores, and hyphens, and must be between 1 and 256 characters long. For a FIFO topic, the name must end with the .fifo suffix. :param topic_name: The name for the topic. :return: The new topic. """ try: topic = self.sns_resource.create_topic( Name=topic_name, Attributes={ "FifoTopic": str(True), "ContentBasedDeduplication": str(False), }, ) logger.info("Created FIFO topic with name=%s.", topic_name) return topic except ClientError as error: logger.exception("Couldn't create topic with name=%s!", topic_name) raise error @staticmethod def add_access_policy(queue, topic_arn): """ Add the necessary access policy to a queue, so it can receive messages from a topic. :param queue: The queue resource. :param topic_arn: The ARN of the topic. :return: None. """ try: queue.set_attributes( Attributes={ "Policy": json.dumps( { "Version": "2012-10-17", "Statement": [ { "Sid": "test-sid", "Effect": "Allow", "Principal": {"AWS": "*"}, "Action": "SQS:SendMessage", "Resource": queue.attributes["QueueArn"], "Condition": { "ArnLike": {"aws:SourceArn": topic_arn} }, } ], } ) } ) logger.info("Added trust policy to the queue.") except ClientError as error: logger.exception("Couldn't add trust policy to the queue!") raise error @staticmethod def subscribe_queue_to_topic(topic, queue_arn): """ Subscribe a queue to a topic. :param topic: The topic resource. :param queue_arn: The ARN of the queue. :return: The subscription resource. """ try: subscription = topic.subscribe( Protocol="sqs", Endpoint=queue_arn, ) logger.info("The queue is subscribed to the topic.") return subscription except ClientError as error: logger.exception("Couldn't subscribe queue to topic!") raise error @staticmethod def publish_price_update(topic, payload, group_id): """ Compose and publish a message that updates the wholesale price. :param topic: The topic to publish to. :param payload: The message to publish. :param group_id: The group ID for the message. :return: The ID of the message. """ try: att_dict = {"business": {"DataType": "String", "StringValue": "wholesale"}} dedup_id = uuid.uuid4() response = topic.publish( Subject="Price Update", Message=payload, MessageAttributes=att_dict, MessageGroupId=group_id, MessageDeduplicationId=str(dedup_id), ) message_id = response["MessageId"] logger.info("Published message to topic %s.", topic.arn) except ClientError as error: logger.exception("Couldn't publish message to topic %s.", topic.arn) raise error return message_id @staticmethod def delete_queue(queue): """ Removes an SQS queue. When run against an AWS account, it can take up to 60 seconds before the queue is actually deleted. :param queue: The queue to delete. :return: None """ try: queue.delete() logger.info("Deleted queue with URL=%s.", queue.url) except ClientError as error: logger.exception("Couldn't delete queue with URL=%s!", queue.url) raise error
  • Para API obtener más información, consulte los siguientes temas en la sección AWS SDKde referencia sobre Python (Boto3). API

SAP ABAP
SDKpara SAP ABAP
nota

Hay más información GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS.

Crea un FIFO tema, suscribe una SQS FIFO cola de Amazon al tema y publica un mensaje en un SNS tema de Amazon.

" Creates a FIFO topic. " DATA lt_tpc_attributes TYPE /aws1/cl_snstopicattrsmap_w=>tt_topicattributesmap. DATA ls_tpc_attributes TYPE /aws1/cl_snstopicattrsmap_w=>ts_topicattributesmap_maprow. ls_tpc_attributes-key = 'FifoTopic'. ls_tpc_attributes-value = NEW /aws1/cl_snstopicattrsmap_w( iv_value = 'true' ). INSERT ls_tpc_attributes INTO TABLE lt_tpc_attributes. TRY. DATA(lo_create_result) = lo_sns->createtopic( iv_name = iv_topic_name it_attributes = lt_tpc_attributes ). DATA(lv_topic_arn) = lo_create_result->get_topicarn( ). ov_topic_arn = lv_topic_arn. " ov_topic_arn is returned for testing purposes. " MESSAGE 'FIFO topic created' TYPE 'I'. CATCH /aws1/cx_snstopiclimitexcdex. MESSAGE 'Unable to create more topics. You have reached the maximum number of topics allowed.' TYPE 'E'. ENDTRY. " Subscribes an endpoint to an Amazon Simple Notification Service (Amazon SNS) topic. " " Only Amazon Simple Queue Service (Amazon SQS) FIFO queues can be subscribed to an SNS FIFO topic. " TRY. DATA(lo_subscribe_result) = lo_sns->subscribe( iv_topicarn = lv_topic_arn iv_protocol = 'sqs' iv_endpoint = iv_queue_arn ). DATA(lv_subscription_arn) = lo_subscribe_result->get_subscriptionarn( ). ov_subscription_arn = lv_subscription_arn. " ov_subscription_arn is returned for testing purposes. " MESSAGE 'SQS queue was subscribed to SNS topic.' TYPE 'I'. CATCH /aws1/cx_snsnotfoundexception. MESSAGE 'Topic does not exist.' TYPE 'E'. CATCH /aws1/cx_snssubscriptionlmte00. MESSAGE 'Unable to create subscriptions. You have reached the maximum number of subscriptions allowed.' TYPE 'E'. ENDTRY. " Publish message to SNS topic. " TRY. DATA lt_msg_attributes TYPE /aws1/cl_snsmessageattrvalue=>tt_messageattributemap. DATA ls_msg_attributes TYPE /aws1/cl_snsmessageattrvalue=>ts_messageattributemap_maprow. ls_msg_attributes-key = 'Importance'. ls_msg_attributes-value = NEW /aws1/cl_snsmessageattrvalue( iv_datatype = 'String' iv_stringvalue = 'High' ). INSERT ls_msg_attributes INTO TABLE lt_msg_attributes. DATA(lo_result) = lo_sns->publish( iv_topicarn = lv_topic_arn iv_message = 'The price of your mobile plan has been increased from $19 to $23' iv_subject = 'Changes to mobile plan' iv_messagegroupid = 'Update-2' iv_messagededuplicationid = 'Update-2.1' it_messageattributes = lt_msg_attributes ). ov_message_id = lo_result->get_messageid( ). " ov_message_id is returned for testing purposes. " MESSAGE 'Message was published to SNS topic.' TYPE 'I'. CATCH /aws1/cx_snsnotfoundexception. MESSAGE 'Topic does not exist.' TYPE 'E'. ENDTRY.

Recibir mensajes de FIFO suscripciones

Ahora puede recibir actualizaciones de precios en las tres aplicaciones suscritas. Como se muestra enFIFOejemplos de casos de uso de temas, el punto de entrada de cada aplicación de consumo es la SQS cola de Amazon, que su AWS Lambda función correspondiente puede sondear automáticamente. Cuando una SQS cola de Amazon es una fuente de eventos para una función de Lambda, Lambda escala su flota de sondeadores según sea necesario para consumir los mensajes de manera eficiente.

Para obtener más información, consulta Uso AWS Lambda con Amazon SQS en la Guía para AWS Lambda desarrolladores. Para obtener información sobre cómo crear sus propios encuestadores de colas, consulte Recomendaciones para Amazon SQS estándar y FIFO colas en la Guía para desarrolladores de Amazon Simple Queue Service y en ReceiveMessagela Referencia de Amazon Simple Queue Service. API

¿Usando AWS CloudFormation

AWS CloudFormation permite utilizar un archivo de plantilla para crear y configurar un conjunto de AWS recursos juntos como una sola unidad. En esta sección, se incluye una plantilla de ejemplo que crea lo siguiente:

  • El SNS FIFO tema de Amazon que distribuye las actualizaciones de precios

  • Las SQS FIFO colas de Amazon que proporcionan estas actualizaciones a las aplicaciones de venta mayorista y minorista

  • La cola SQS estándar de Amazon para la aplicación de análisis que almacena registros, que se pueden consultar para la inteligencia empresarial (BI)

  • SNSFIFOLas suscripciones de Amazon que conectan las tres colas con el tema

  • Una política de filtro en la que se especifica que las aplicaciones de suscriptor reciben solo las actualizaciones de precios que necesitan.

nota

Si prueba este código de ejemplo al publicar un mensaje en el tema, asegúrese de publicar el mensaje con el atributo de business. Especifique retail o wholesale en el valor de atributo. De lo contrario, el mensaje se filtra y no se entrega a las colas suscritas.

{ "AWSTemplateFormatVersion": "2010-09-09", "Resources": { "PriceUpdatesTopic": { "Type": "AWS::SNS::Topic", "Properties": { "TopicName": "PriceUpdatesTopic.fifo", "FifoTopic": true, "ContentBasedDeduplication": false, "ArchivePolicy": { "MessageRetentionPeriod": "30" } } }, "WholesaleQueue": { "Type": "AWS::SQS::Queue", "Properties": { "QueueName": "WholesaleQueue.fifo", "FifoQueue": true, "ContentBasedDeduplication": false } }, "RetailQueue": { "Type": "AWS::SQS::Queue", "Properties": { "QueueName": "RetailQueue.fifo", "FifoQueue": true, "ContentBasedDeduplication": false } }, "AnalyticsQueue": { "Type": "AWS::SQS::Queue", "Properties": { "QueueName": "AnalyticsQueue" } }, "WholesaleSubscription": { "Type": "AWS::SNS::Subscription", "Properties": { "TopicArn": { "Ref": "PriceUpdatesTopic" }, "Endpoint": { "Fn::GetAtt": [ "WholesaleQueue", "Arn" ] }, "Protocol": "sqs", "RawMessageDelivery": "false", "FilterPolicyScope": "MessageBody", "FilterPolicy": { "business": [ "wholesale" ] } } }, "RetailSubscription": { "Type": "AWS::SNS::Subscription", "Properties": { "TopicArn": { "Ref": "PriceUpdatesTopic" }, "Endpoint": { "Fn::GetAtt": [ "RetailQueue", "Arn" ] }, "Protocol": "sqs", "RawMessageDelivery": "false", "FilterPolicyScope": "MessageBody", "FilterPolicy": { "business": [ "retail" ] } } }, "AnalyticsSubscription": { "Type": "AWS::SNS::Subscription", "Properties": { "TopicArn": { "Ref": "PriceUpdatesTopic" }, "Endpoint": { "Fn::GetAtt": [ "AnalyticsQueue", "Arn" ] }, "Protocol": "sqs", "RawMessageDelivery": "false" } }, "SalesQueuesPolicy": { "Type": "AWS::SQS::QueuePolicy", "Properties": { "PolicyDocument": { "Statement": [ { "Effect": "Allow", "Principal": { "Service": "sns.amazonaws.com" }, "Action": [ "sqs:SendMessage" ], "Resource": "*", "Condition": { "ArnEquals": { "aws:SourceArn": { "Ref": "PriceUpdatesTopic" } } } } ] }, "Queues": [ { "Ref": "WholesaleQueue" }, { "Ref": "RetailQueue" }, { "Ref": "AnalyticsQueue" } ] } } } }

Para obtener más información sobre la implementación de AWS recursos mediante una AWS CloudFormation plantilla, consulte Primeros pasos en la Guía del AWS CloudFormation usuario.