Publish Amazon SNS messages to Amazon SQS queues using an AWS SDK - AWS SDK Code Examples

There are more AWS SDK examples available in the AWS Doc SDK Examples GitHub repo.

Publish Amazon SNS messages to Amazon SQS queues using an AWS SDK

The following code examples show how to:

  • Create topic (FIFO or non-FIFO).

  • Subscribe several queues to the topic with an option to apply a filter.

  • Publish messages to the topic.

  • Poll the queues for messages received.

.NET
AWS SDK for .NET
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

Run an interactive scenario at a command prompt.

/// <summary> /// Console application to run a workflow scenario for topics and queues. /// </summary> public static class TopicsAndQueues { private static bool _useFifoTopic = false; private static bool _useContentBasedDeduplication = false; private static string _topicName = null!; private static string _topicArn = null!; private static readonly int _queueCount = 2; private static readonly string[] _queueUrls = new string[_queueCount]; private static readonly string[] _subscriptionArns = new string[_queueCount]; private static readonly string[] _tones = { "cheerful", "funny", "serious", "sincere" }; public static SNSWrapper SnsWrapper { get; set; } = null!; public static SQSWrapper SqsWrapper { get; set; } = null!; public static bool UseConsole { get; set; } = true; static async Task Main(string[] args) { // Set up dependency injection for Amazon EventBridge. using var host = Host.CreateDefaultBuilder(args) .ConfigureLogging(logging => logging.AddFilter("System", LogLevel.Debug) .AddFilter<DebugLoggerProvider>("Microsoft", LogLevel.Information) .AddFilter<ConsoleLoggerProvider>("Microsoft", LogLevel.Trace)) .ConfigureServices((_, services) => services.AddAWSService<IAmazonSQS>() .AddAWSService<IAmazonSimpleNotificationService>() .AddTransient<SNSWrapper>() .AddTransient<SQSWrapper>() ) .Build(); ServicesSetup(host); PrintDescription(); await RunScenario(); } /// <summary> /// Populate the services for use within the console application. /// </summary> /// <param name="host">The services host.</param> private static void ServicesSetup(IHost host) { SnsWrapper = host.Services.GetRequiredService<SNSWrapper>(); SqsWrapper = host.Services.GetRequiredService<SQSWrapper>(); } /// <summary> /// Run the scenario for working with topics and queues. /// </summary> /// <returns>True if successful.</returns> public static async Task<bool> RunScenario() { try { await SetupTopic(); await SetupQueues(); await PublishMessages(); foreach (var queueUrl in _queueUrls) { var messages = await PollForMessages(queueUrl); if (messages.Any()) { await DeleteMessages(queueUrl, messages); } } await CleanupResources(); Console.WriteLine("Messaging with topics and queues workflow is complete."); return true; } catch (Exception ex) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"There was a problem running the scenario: {ex.Message}"); await CleanupResources(); Console.WriteLine(new string('-', 80)); return false; } } /// <summary> /// Print a description for the tasks in the workflow. /// </summary> /// <returns>Async task.</returns> private static void PrintDescription() { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Welcome to messaging with topics and queues."); Console.WriteLine(new string('-', 80)); Console.WriteLine($"In this workflow, you will create an SNS topic and subscribe {_queueCount} SQS queues to the topic." + $"\r\nYou can select from several options for configuring the topic and the subscriptions for the 2 queues." + $"\r\nYou can then post to the topic and see the results in the queues.\r\n"); Console.WriteLine(new string('-', 80)); } /// <summary> /// Set up the SNS topic to be used with the queues. /// </summary> /// <returns>Async task.</returns> private static async Task<string> SetupTopic() { Console.WriteLine(new string('-', 80)); Console.WriteLine($"SNS topics can be configured as FIFO (First-In-First-Out)." + $"\r\nFIFO topics deliver messages in order and support deduplication and message filtering." + $"\r\nYou can then post to the topic and see the results in the queues.\r\n"); _useFifoTopic = GetYesNoResponse("Would you like to work with FIFO topics?"); if (_useFifoTopic) { Console.WriteLine(new string('-', 80)); _topicName = GetUserResponse("Enter a name for your SNS topic: ", "example-topic"); Console.WriteLine( "Because you have selected a FIFO topic, '.fifo' must be appended to the topic name.\r\n"); Console.WriteLine(new string('-', 80)); Console.WriteLine($"Because you have chosen a FIFO topic, deduplication is supported." + $"\r\nDeduplication IDs are either set in the message or automatically generated " + $"\r\nfrom content using a hash function.\r\n" + $"\r\nIf a message is successfully published to an SNS FIFO topic, any message " + $"\r\npublished and determined to have the same deduplication ID, " + $"\r\nwithin the five-minute deduplication interval, is accepted but not delivered.\r\n" + $"\r\nFor more information about deduplication, " + $"\r\nsee https://docs.aws.amazon.com/sns/latest/dg/fifo-message-dedup.html."); _useContentBasedDeduplication = GetYesNoResponse("Use content-based deduplication instead of entering a deduplication ID?"); Console.WriteLine(new string('-', 80)); } _topicArn = await SnsWrapper.CreateTopicWithName(_topicName, _useFifoTopic, _useContentBasedDeduplication); Console.WriteLine($"Your new topic with the name {_topicName}" + $"\r\nand Amazon Resource Name (ARN) {_topicArn}" + $"\r\nhas been created.\r\n"); Console.WriteLine(new string('-', 80)); return _topicArn; } /// <summary> /// Set up the queues. /// </summary> /// <returns>Async task.</returns> private static async Task SetupQueues() { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Now you will create {_queueCount} Amazon Simple Queue Service (Amazon SQS) queues to subscribe to the topic."); // Repeat this section for each queue. for (int i = 0; i < _queueCount; i++) { var queueName = GetUserResponse("Enter a name for an Amazon SQS queue: ", $"example-queue-{i}"); if (_useFifoTopic) { // Only explain this once. if (i == 0) { Console.WriteLine( "Because you have selected a FIFO topic, '.fifo' must be appended to the queue name."); } var queueUrl = await SqsWrapper.CreateQueueWithName(queueName, _useFifoTopic); _queueUrls[i] = queueUrl; Console.WriteLine($"Your new queue with the name {queueName}" + $"\r\nand queue URL {queueUrl}" + $"\r\nhas been created.\r\n"); if (i == 0) { Console.WriteLine( $"The queue URL is used to retrieve the queue ARN,\r\n" + $"which is used to create a subscription."); Console.WriteLine(new string('-', 80)); } var queueArn = await SqsWrapper.GetQueueArnByUrl(queueUrl); if (i == 0) { Console.WriteLine( $"An AWS Identity and Access Management (IAM) policy must be attached to an SQS queue, enabling it to receive\r\n" + $"messages from an SNS topic"); } await SqsWrapper.SetQueuePolicyForTopic(queueArn, _topicArn, queueUrl); await SetupFilters(i, queueArn, queueName); } } Console.WriteLine(new string('-', 80)); } /// <summary> /// Set up filters with user options for a queue. /// </summary> /// <param name="queueCount">The number of this queue.</param> /// <param name="queueArn">The ARN of the queue.</param> /// <param name="queueName">The name of the queue.</param> /// <returns>Async Task.</returns> public static async Task SetupFilters(int queueCount, string queueArn, string queueName) { if (_useFifoTopic) { Console.WriteLine(new string('-', 80)); // Only explain this once. if (queueCount == 0) { Console.WriteLine( "Subscriptions to a FIFO topic can have filters." + "If you add a filter to this subscription, then only the filtered messages " + "will be received in the queue."); Console.WriteLine( "For information about message filtering, " + "see https://docs.aws.amazon.com/sns/latest/dg/sns-message-filtering.html"); Console.WriteLine( "For this example, you can filter messages by a" + "TONE attribute."); } var useFilter = GetYesNoResponse($"Filter messages for {queueName}'s subscription to the topic?"); string? filterPolicy = null; if (useFilter) { filterPolicy = CreateFilterPolicy(); } var subscriptionArn = await SnsWrapper.SubscribeTopicWithFilter(_topicArn, filterPolicy, queueArn); _subscriptionArns[queueCount] = subscriptionArn; Console.WriteLine( $"The queue {queueName} has been subscribed to the topic {_topicName} " + $"with the subscription ARN {subscriptionArn}"); Console.WriteLine(new string('-', 80)); } } /// <summary> /// Use user input to create a filter policy for a subscription. /// </summary> /// <returns>The serialized filter policy.</returns> public static string CreateFilterPolicy() { Console.WriteLine(new string('-', 80)); Console.WriteLine( $"You can filter messages by one or more of the following" + $"TONE attributes."); List<string> filterSelections = new List<string>(); var selectionNumber = 0; do { Console.WriteLine( $"Enter a number to add a TONE filter, or enter 0 to stop adding filters."); for (int i = 0; i < _tones.Length; i++) { Console.WriteLine($"\t{i + 1}. {_tones[i]}"); } var selection = GetUserResponse("", filterSelections.Any() ? "0" : "1"); int.TryParse(selection, out selectionNumber); if (selectionNumber > 0 && !filterSelections.Contains(_tones[selectionNumber - 1])) { filterSelections.Add(_tones[selectionNumber - 1]); } } while (selectionNumber != 0); var filters = new Dictionary<string, List<string>> { { "tone", filterSelections } }; string filterPolicy = JsonSerializer.Serialize(filters); return filterPolicy; } /// <summary> /// Publish messages using user settings. /// </summary> /// <returns>Async task.</returns> public static async Task PublishMessages() { Console.WriteLine("Now we can publish messages."); var keepSendingMessages = true; string? deduplicationId = null; string? toneAttribute = null; while (keepSendingMessages) { Console.WriteLine(); var message = GetUserResponse("Enter a message to publish.", "This is a sample message"); if (_useFifoTopic) { Console.WriteLine("Because you are using a FIFO topic, you must set a message group ID." + "\r\nAll messages within the same group will be received in the order " + "they were published."); Console.WriteLine(); var messageGroupId = GetUserResponse("Enter a message group ID for this message:", "1"); if (!_useContentBasedDeduplication) { Console.WriteLine("Because you are not using content-based deduplication, " + "you must enter a deduplication ID."); Console.WriteLine("Enter a deduplication ID for this message."); deduplicationId = GetUserResponse("Enter a deduplication ID for this message.", "1"); } if (GetYesNoResponse("Add an attribute to this message?")) { Console.WriteLine("Enter a number for an attribute."); for (int i = 0; i < _tones.Length; i++) { Console.WriteLine($"\t{i + 1}. {_tones[i]}"); } var selection = GetUserResponse("", "1"); int.TryParse(selection, out var selectionNumber); if (selectionNumber > 0 && selectionNumber < _tones.Length) { toneAttribute = _tones[selectionNumber - 1]; } } var messageID = await SnsWrapper.PublishToTopicWithAttribute( _topicArn, message, "tone", toneAttribute, deduplicationId, messageGroupId); Console.WriteLine($"Message published with id {messageID}."); } keepSendingMessages = GetYesNoResponse("Send another message?", false); } } /// <summary> /// Poll for the published messages to see the results of the user's choices. /// </summary> /// <returns>Async task.</returns> public static async Task<List<Message>> PollForMessages(string queueUrl) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Now the SQS queue at {queueUrl} will be polled to retrieve the messages." + "\r\nPress any key to continue."); if (UseConsole) { Console.ReadLine(); } var moreMessages = true; var messages = new List<Message>(); while (moreMessages) { var newMessages = await SqsWrapper.ReceiveMessagesByUrl(queueUrl, 10); moreMessages = newMessages.Any(); if (moreMessages) { messages.AddRange(newMessages); } } Console.WriteLine($"{messages.Count} message(s) were received by the queue at {queueUrl}."); foreach (var message in messages) { Console.WriteLine("\tMessage:" + $"\n\t{message.Body}"); } Console.WriteLine(new string('-', 80)); return messages; } /// <summary> /// Delete the message using handles in a batch. /// </summary> /// <returns>Async task.</returns> public static async Task DeleteMessages(string queueUrl, List<Message> messages) { Console.WriteLine(new string('-', 80)); Console.WriteLine("Now we can delete the messages in this queue in a batch."); await SqsWrapper.DeleteMessageBatchByUrl(queueUrl, messages); Console.WriteLine(new string('-', 80)); } /// <summary> /// Clean up the resources from the scenario. /// </summary> /// <returns>Async task.</returns> private static async Task CleanupResources() { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Clean up resources."); try { foreach (var queueUrl in _queueUrls) { if (!string.IsNullOrEmpty(queueUrl)) { var deleteQueue = GetYesNoResponse($"Delete queue with url {queueUrl}?"); if (deleteQueue) { await SqsWrapper.DeleteQueueByUrl(queueUrl); } } } foreach (var subscriptionArn in _subscriptionArns) { if (!string.IsNullOrEmpty(subscriptionArn)) { await SnsWrapper.UnsubscribeByArn(subscriptionArn); } } var deleteTopic = GetYesNoResponse($"Delete topic {_topicName}?"); if (deleteTopic) { await SnsWrapper.DeleteTopicByArn(_topicArn); } } catch (Exception ex) { Console.WriteLine($"Unable to clean up resources. Here's why: {ex.Message}."); } Console.WriteLine(new string('-', 80)); } /// <summary> /// Helper method to get a yes or no response from the user. /// </summary> /// <param name="question">The question string to print on the console.</param> /// <param name="defaultAnswer">Optional default answer to use.</param> /// <returns>True if the user responds with a yes.</returns> private static bool GetYesNoResponse(string question, bool defaultAnswer = true) { if (UseConsole) { Console.WriteLine(question); var ynResponse = Console.ReadLine(); var response = ynResponse != null && ynResponse.Equals("y", StringComparison.InvariantCultureIgnoreCase); return response; } // If not using the console, use the default. return defaultAnswer; } /// <summary> /// Helper method to get a string response from the user through the console. /// </summary> /// <param name="question">The question string to print on the console.</param> /// <param name="defaultAnswer">Optional default answer to use.</param> /// <returns>True if the user responds with a yes.</returns> private static string GetUserResponse(string question, string defaultAnswer) { if (UseConsole) { var response = ""; while (string.IsNullOrEmpty(response)) { Console.WriteLine(question); response = Console.ReadLine(); } return response; } // If not using the console, use the default. return defaultAnswer; } }

Create a class that wraps Amazon SQS operations.

/// <summary> /// Wrapper for Amazon Simple Queue Service (SQS) operations. /// </summary> public class SQSWrapper { private readonly IAmazonSQS _amazonSQSClient; /// <summary> /// Constructor for the Amazon SQS wrapper. /// </summary> /// <param name="amazonSQS">The injected Amazon SQS client.</param> public SQSWrapper(IAmazonSQS amazonSQS) { _amazonSQSClient = amazonSQS; } /// <summary> /// Create a queue with a specific name. /// </summary> /// <param name="queueName">The name for the queue.</param> /// <param name="useFifoQueue">True to use a FIFO queue.</param> /// <returns>The url for the queue.</returns> public async Task<string> CreateQueueWithName(string queueName, bool useFifoQueue) { int maxMessage = 256 * 1024; var queueAttributes = new Dictionary<string, string> { { QueueAttributeName.MaximumMessageSize, maxMessage.ToString() } }; var createQueueRequest = new CreateQueueRequest() { QueueName = queueName, Attributes = queueAttributes }; if (useFifoQueue) { // Update the name if it is not correct for a FIFO queue. if (!queueName.EndsWith(".fifo")) { createQueueRequest.QueueName = queueName + ".fifo"; } // Add an attribute for a FIFO queue. createQueueRequest.Attributes.Add( QueueAttributeName.FifoQueue, "true"); } var createResponse = await _amazonSQSClient.CreateQueueAsync( new CreateQueueRequest() { QueueName = queueName }); return createResponse.QueueUrl; } /// <summary> /// Get the ARN for a queue from its URL. /// </summary> /// <param name="queueUrl">The URL of the queue.</param> /// <returns>The ARN of the queue.</returns> public async Task<string> GetQueueArnByUrl(string queueUrl) { var getAttributesRequest = new GetQueueAttributesRequest() { QueueUrl = queueUrl, AttributeNames = new List<string>() { QueueAttributeName.QueueArn } }; var getAttributesResponse = await _amazonSQSClient.GetQueueAttributesAsync( getAttributesRequest); return getAttributesResponse.QueueARN; } /// <summary> /// Set the policy attribute of a queue for a topic. /// </summary> /// <param name="queueArn">The ARN of the queue.</param> /// <param name="topicArn">The ARN of the topic.</param> /// <param name="queueUrl">The url for the queue.</param> /// <returns>True if successful.</returns> public async Task<bool> SetQueuePolicyForTopic(string queueArn, string topicArn, string queueUrl) { var queuePolicy = "{" + "\"Version\": \"2012-10-17\"," + "\"Statement\": [{" + "\"Effect\": \"Allow\"," + "\"Principal\": {" + $"\"Service\": " + "\"sns.amazonaws.com\"" + "}," + "\"Action\": \"sqs:SendMessage\"," + $"\"Resource\": \"{queueArn}\"," + "\"Condition\": {" + "\"ArnEquals\": {" + $"\"aws:SourceArn\": \"{topicArn}\"" + "}" + "}" + "}]" + "}"; var attributesResponse = await _amazonSQSClient.SetQueueAttributesAsync( new SetQueueAttributesRequest() { QueueUrl = queueUrl, Attributes = new Dictionary<string, string>() { { "Policy", queuePolicy } } }); return attributesResponse.HttpStatusCode == HttpStatusCode.OK; } /// <summary> /// Receive messages from a queue by its URL. /// </summary> /// <param name="queueUrl">The url of the queue.</param> /// <returns>The list of messages.</returns> public async Task<List<Message>> ReceiveMessagesByUrl(string queueUrl, int maxMessages) { // Setting WaitTimeSeconds to non-zero enables long polling. // For information about long polling, see // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html var messageResponse = await _amazonSQSClient.ReceiveMessageAsync( new ReceiveMessageRequest() { QueueUrl = queueUrl, MaxNumberOfMessages = maxMessages, WaitTimeSeconds = 1 }); return messageResponse.Messages; } /// <summary> /// Delete a batch of messages from a queue by its url. /// </summary> /// <param name="queueUrl">The url of the queue.</param> /// <returns>True if successful.</returns> public async Task<bool> DeleteMessageBatchByUrl(string queueUrl, List<Message> messages) { var deleteRequest = new DeleteMessageBatchRequest() { QueueUrl = queueUrl, Entries = new List<DeleteMessageBatchRequestEntry>() }; foreach (var message in messages) { deleteRequest.Entries.Add(new DeleteMessageBatchRequestEntry() { ReceiptHandle = message.ReceiptHandle, Id = message.MessageId }); } var deleteResponse = await _amazonSQSClient.DeleteMessageBatchAsync(deleteRequest); return deleteResponse.Failed.Any(); } /// <summary> /// Delete a queue by its URL. /// </summary> /// <param name="queueUrl">The url of the queue.</param> /// <returns>True if successful.</returns> public async Task<bool> DeleteQueueByUrl(string queueUrl) { var deleteResponse = await _amazonSQSClient.DeleteQueueAsync( new DeleteQueueRequest() { QueueUrl = queueUrl }); return deleteResponse.HttpStatusCode == HttpStatusCode.OK; } }

Create a class that wraps Amazon SNS operations.

/// <summary> /// Wrapper for Amazon Simple Notification Service (SNS) operations. /// </summary> public class SNSWrapper { private readonly IAmazonSimpleNotificationService _amazonSNSClient; /// <summary> /// Constructor for the Amazon SNS wrapper. /// </summary> /// <param name="amazonSQS">The injected Amazon SNS client.</param> public SNSWrapper(IAmazonSimpleNotificationService amazonSNS) { _amazonSNSClient = amazonSNS; } /// <summary> /// Create a new topic with a name and specific FIFO and de-duplication attributes. /// </summary> /// <param name="topicName">The name for the topic.</param> /// <param name="useFifoTopic">True to use a FIFO topic.</param> /// <param name="useContentBasedDeduplication">True to use content-based de-duplication.</param> /// <returns>The ARN of the new topic.</returns> public async Task<string> CreateTopicWithName(string topicName, bool useFifoTopic, bool useContentBasedDeduplication) { var createTopicRequest = new CreateTopicRequest() { Name = topicName, }; if (useFifoTopic) { // Update the name if it is not correct for a FIFO topic. if (!topicName.EndsWith(".fifo")) { createTopicRequest.Name = topicName + ".fifo"; } // Add the attributes from the method parameters. createTopicRequest.Attributes = new Dictionary<string, string> { { "FifoTopic", "true" } }; if (useContentBasedDeduplication) { createTopicRequest.Attributes.Add("ContentBasedDeduplication", "true"); } } var createResponse = await _amazonSNSClient.CreateTopicAsync(createTopicRequest); return createResponse.TopicArn; } /// <summary> /// Subscribe a queue to a topic with optional filters. /// </summary> /// <param name="topicArn">The ARN of the topic.</param> /// <param name="useFifoTopic">The optional filtering policy for the subscription.</param> /// <param name="queueArn">The ARN of the queue.</param> /// <returns>The ARN of the new subscription.</returns> public async Task<string> SubscribeTopicWithFilter(string topicArn, string? filterPolicy, string queueArn) { var subscribeRequest = new SubscribeRequest() { TopicArn = topicArn, Protocol = "sqs", Endpoint = queueArn }; if (!string.IsNullOrEmpty(filterPolicy)) { subscribeRequest.Attributes = new Dictionary<string, string> { { "FilterPolicy", filterPolicy } }; } var subscribeResponse = await _amazonSNSClient.SubscribeAsync(subscribeRequest); return subscribeResponse.SubscriptionArn; } /// <summary> /// Publish a message to a topic with an attribute and optional deduplication and group IDs. /// </summary> /// <param name="topicArn">The ARN of the topic.</param> /// <param name="message">The message to publish.</param> /// <param name="attributeName">The optional attribute for the message.</param> /// <param name="attributeValue">The optional attribute value for the message.</param> /// <param name="deduplicationId">The optional deduplication ID for the message.</param> /// <param name="groupId">The optional group ID for the message.</param> /// <returns>The ID of the message published.</returns> public async Task<string> PublishToTopicWithAttribute( string topicArn, string message, string? attributeName = null, string? attributeValue = null, string? deduplicationId = null, string? groupId = null) { var publishRequest = new PublishRequest() { TopicArn = topicArn, Message = message, MessageDeduplicationId = deduplicationId, MessageGroupId = groupId }; if (attributeValue != null) { // Add the string attribute if it exists. publishRequest.MessageAttributes = new Dictionary<string, MessageAttributeValue> { { attributeName!, new MessageAttributeValue() { StringValue = attributeValue, DataType = "String"} } }; } var publishResponse = await _amazonSNSClient.PublishAsync(publishRequest); return publishResponse.MessageId; } /// <summary> /// Unsubscribe from a topic by a subscription ARN. /// </summary> /// <param name="subscriptionArn">The ARN of the subscription.</param> /// <returns>True if successful.</returns> public async Task<bool> UnsubscribeByArn(string subscriptionArn) { var unsubscribeResponse = await _amazonSNSClient.UnsubscribeAsync( new UnsubscribeRequest() { SubscriptionArn = subscriptionArn }); return unsubscribeResponse.HttpStatusCode == HttpStatusCode.OK; } /// <summary> /// Delete a topic by its topic ARN. /// </summary> /// <param name="topicArn">The ARN of the topic.</param> /// <returns>True if successful.</returns> public async Task<bool> DeleteTopicByArn(string topicArn) { var deleteResponse = await _amazonSNSClient.DeleteTopicAsync( new DeleteTopicRequest() { TopicArn = topicArn }); return deleteResponse.HttpStatusCode == HttpStatusCode.OK; } }
C++
SDK for C++
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

Aws::Client::ClientConfiguration clientConfig; // Optional: Set to the AWS Region (overrides config file). // clientConfig.region = "us-east-1"; //! Workflow for messaging with topics and queues using Amazon SNS and Amazon SQS. /*! \param clientConfig Aws client configuration. \return bool: Successful completion. */ bool AwsDoc::TopicsAndQueues::messagingWithTopicsAndQueues( const Aws::Client::ClientConfiguration &clientConfiguration) { std::cout << "Welcome to messaging with topics and queues." << std::endl; printAsterisksLine(); std::cout << "In this workflow, you will create an SNS topic and subscribe " << NUMBER_OF_QUEUES << " SQS queues to the topic." << std::endl; std::cout << "You can select from several options for configuring the topic and the subscriptions for the " << NUMBER_OF_QUEUES << " queues." << std::endl; std::cout << "You can then post to the topic and see the results in the queues." << std::endl; Aws::SNS::SNSClient snsClient(clientConfiguration); printAsterisksLine(); std::cout << "SNS topics can be configured as FIFO (First-In-First-Out)." << std::endl; std::cout << "FIFO topics deliver messages in order and support deduplication and message filtering." << std::endl; bool isFifoTopic = askYesNoQuestion( "Would you like to work with FIFO topics? (y/n) "); bool contentBasedDeduplication = false; Aws::String topicName; if (isFifoTopic) { printAsterisksLine(); std::cout << "Because you have chosen a FIFO topic, deduplication is supported." << std::endl; std::cout << "Deduplication IDs are either set in the message or automatically generated " << "from content using a hash function." << std::endl; std::cout << "If a message is successfully published to an SNS FIFO topic, any message " << "published and determined to have the same deduplication ID, " << std::endl; std::cout << "within the five-minute deduplication interval, is accepted but not delivered." << std::endl; std::cout << "For more information about deduplication, " << "see https://docs.aws.amazon.com/sns/latest/dg/fifo-message-dedup.html." << std::endl; contentBasedDeduplication = askYesNoQuestion( "Use content-based deduplication instead of entering a deduplication ID? (y/n) "); } printAsterisksLine(); Aws::SQS::SQSClient sqsClient(clientConfiguration); Aws::Vector<Aws::String> queueURLS; Aws::Vector<Aws::String> subscriptionARNS; Aws::String topicARN; { topicName = askQuestion("Enter a name for your SNS topic. "); // 1. Create an Amazon SNS topic, either FIFO or non-FIFO. Aws::SNS::Model::CreateTopicRequest request; if (isFifoTopic) { request.AddAttributes("FifoTopic", "true"); if (contentBasedDeduplication) { request.AddAttributes("ContentBasedDeduplication", "true"); } topicName = topicName + FIFO_SUFFIX; std::cout << "Because you have selected a FIFO topic, '.fifo' must be appended to the topic name." << std::endl; } request.SetName(topicName); Aws::SNS::Model::CreateTopicOutcome outcome = snsClient.CreateTopic(request); if (outcome.IsSuccess()) { topicARN = outcome.GetResult().GetTopicArn(); std::cout << "Your new topic with the name '" << topicName << "' and the topic Amazon Resource Name (ARN) " << std::endl; std::cout << "'" << topicARN << "' has been created." << std::endl; } else { std::cerr << "Error with TopicsAndQueues::CreateTopic. " << outcome.GetError().GetMessage() << std::endl; cleanUp(topicARN, queueURLS, subscriptionARNS, snsClient, sqsClient); return false; } } printAsterisksLine(); std::cout << "Now you will create " << NUMBER_OF_QUEUES << " SQS queues to subscribe to the topic." << std::endl; Aws::Vector<Aws::String> queueNames; bool filteringMessages = false; bool first = true; for (int i = 1; i <= NUMBER_OF_QUEUES; ++i) { Aws::String queueURL; Aws::String queueName; { printAsterisksLine(); std::ostringstream ostringstream; ostringstream << "Enter a name for " << (first ? "an" : "the next") << " SQS queue. "; queueName = askQuestion(ostringstream.str()); // 2. Create an SQS queue. Aws::SQS::Model::CreateQueueRequest request; if (isFifoTopic) { request.AddAttributes(Aws::SQS::Model::QueueAttributeName::FifoQueue, "true"); queueName = queueName + FIFO_SUFFIX; if (first) // Only explain this once. { std::cout << "Because you are creating a FIFO SQS queue, '.fifo' must " << "be appended to the queue name." << std::endl; } } request.SetQueueName(queueName); queueNames.push_back(queueName); Aws::SQS::Model::CreateQueueOutcome outcome = sqsClient.CreateQueue(request); if (outcome.IsSuccess()) { queueURL = outcome.GetResult().GetQueueUrl(); std::cout << "Your new SQS queue with the name '" << queueName << "' and the queue URL " << std::endl; std::cout << "'" << queueURL << "' has been created." << std::endl; } else { std::cerr << "Error with SQS::CreateQueue. " << outcome.GetError().GetMessage() << std::endl; cleanUp(topicARN, queueURLS, subscriptionARNS, snsClient, sqsClient); return false; } } queueURLS.push_back(queueURL); if (first) // Only explain this once. { std::cout << "The queue URL is used to retrieve the queue ARN, which is " << "used to create a subscription." << std::endl; } Aws::String queueARN; { // 3. Get the SQS queue ARN attribute. Aws::SQS::Model::GetQueueAttributesRequest request; request.SetQueueUrl(queueURL); request.AddAttributeNames(Aws::SQS::Model::QueueAttributeName::QueueArn); Aws::SQS::Model::GetQueueAttributesOutcome outcome = sqsClient.GetQueueAttributes(request); if (outcome.IsSuccess()) { const Aws::Map<Aws::SQS::Model::QueueAttributeName, Aws::String> &attributes = outcome.GetResult().GetAttributes(); const auto &iter = attributes.find( Aws::SQS::Model::QueueAttributeName::QueueArn); if (iter != attributes.end()) { queueARN = iter->second; std::cout << "The queue ARN '" << queueARN << "' has been retrieved." << std::endl; } else { std::cerr << "Error ARN attribute not returned by GetQueueAttribute." << std::endl; cleanUp(topicARN, queueURLS, subscriptionARNS, snsClient, sqsClient); return false; } } else { std::cerr << "Error with SQS::GetQueueAttributes. " << outcome.GetError().GetMessage() << std::endl; cleanUp(topicARN, queueURLS, subscriptionARNS, snsClient, sqsClient); return false; } } if (first) { std::cout << "An IAM policy must be attached to an SQS queue, enabling it to receive " "messages from an SNS topic." << std::endl; } { // 4. Set the SQS queue policy attribute with a policy enabling the receipt of SNS messages. Aws::SQS::Model::SetQueueAttributesRequest request; request.SetQueueUrl(queueURL); Aws::String policy = createPolicyForQueue(queueARN, topicARN); request.AddAttributes(Aws::SQS::Model::QueueAttributeName::Policy, policy); Aws::SQS::Model::SetQueueAttributesOutcome outcome = sqsClient.SetQueueAttributes(request); if (outcome.IsSuccess()) { std::cout << "The attributes for the queue '" << queueName << "' were successfully updated." << std::endl; } else { std::cerr << "Error with SQS::SetQueueAttributes. " << outcome.GetError().GetMessage() << std::endl; cleanUp(topicARN, queueURLS, subscriptionARNS, snsClient, sqsClient); return false; } } printAsterisksLine(); { // 5. Subscribe the SQS queue to the SNS topic. Aws::SNS::Model::SubscribeRequest request; request.SetTopicArn(topicARN); request.SetProtocol("sqs"); request.SetEndpoint(queueARN); if (isFifoTopic) { if (first) { std::cout << "Subscriptions to a FIFO topic can have filters." << std::endl; std::cout << "If you add a filter to this subscription, then only the filtered messages " << "will be received in the queue." << std::endl; std::cout << "For information about message filtering, " << "see https://docs.aws.amazon.com/sns/latest/dg/sns-message-filtering.html" << std::endl; std::cout << "For this example, you can filter messages by a \"" << TONE_ATTRIBUTE << "\" attribute." << std::endl; } std::ostringstream ostringstream; ostringstream << "Filter messages for \"" << queueName << "\"'s subscription to the topic \"" << topicName << "\"? (y/n)"; // Add filter if user answers yes. if (askYesNoQuestion(ostringstream.str())) { Aws::String jsonPolicy = getFilterPolicyFromUser(); if (!jsonPolicy.empty()) { filteringMessages = true; std::cout << "This is the filter policy for this subscription." << std::endl; std::cout << jsonPolicy << std::endl; request.AddAttributes("FilterPolicy", jsonPolicy); } else { std::cout << "Because you did not select any attributes, no filter " << "will be added to this subscription." << std::endl; } } } // if (isFifoTopic) Aws::SNS::Model::SubscribeOutcome outcome = snsClient.Subscribe(request); if (outcome.IsSuccess()) { Aws::String subscriptionARN = outcome.GetResult().GetSubscriptionArn(); std::cout << "The queue '" << queueName << "' has been subscribed to the topic '" << "'" << topicName << "'" << std::endl; std::cout << "with the subscription ARN '" << subscriptionARN << "." << std::endl; subscriptionARNS.push_back(subscriptionARN); } else { std::cerr << "Error with TopicsAndQueues::Subscribe. " << outcome.GetError().GetMessage() << std::endl; cleanUp(topicARN, queueURLS, subscriptionARNS, snsClient, sqsClient); return false; } } first = false; } first = true; do { printAsterisksLine(); // 6. Publish a message to the SNS topic. Aws::SNS::Model::PublishRequest request; request.SetTopicArn(topicARN); Aws::String message = askQuestion("Enter a message text to publish. "); request.SetMessage(message); if (isFifoTopic) { if (first) { std::cout << "Because you are using a FIFO topic, you must set a message group ID." << std::endl; std::cout << "All messages within the same group will be received in the " << "order they were published." << std::endl; } Aws::String messageGroupID = askQuestion( "Enter a message group ID for this message. "); request.SetMessageGroupId(messageGroupID); if (!contentBasedDeduplication) { if (first) { std::cout << "Because you are not using content-based deduplication, " << "you must enter a deduplication ID." << std::endl; } Aws::String deduplicationID = askQuestion( "Enter a deduplication ID for this message. "); request.SetMessageDeduplicationId(deduplicationID); } } if (filteringMessages && askYesNoQuestion( "Add an attribute to this message? (y/n) ")) { for (size_t i = 0; i < TONES.size(); ++i) { std::cout << " " << (i + 1) << ". " << TONES[i] << std::endl; } int selection = askQuestionForIntRange( "Enter a number for an attribute. ", 1, static_cast<int>(TONES.size())); Aws::SNS::Model::MessageAttributeValue messageAttributeValue; messageAttributeValue.SetDataType("String"); messageAttributeValue.SetStringValue(TONES[selection - 1]); request.AddMessageAttributes(TONE_ATTRIBUTE, messageAttributeValue); } Aws::SNS::Model::PublishOutcome outcome = snsClient.Publish(request); if (outcome.IsSuccess()) { std::cout << "Your message was successfully published." << std::endl; } else { std::cerr << "Error with TopicsAndQueues::Publish. " << outcome.GetError().GetMessage() << std::endl; cleanUp(topicARN, queueURLS, subscriptionARNS, snsClient, sqsClient); return false; } first = false; } while (askYesNoQuestion("Post another message? (y/n) ")); printAsterisksLine(); std::cout << "Now the SQS queue will be polled to retrieve the messages." << std::endl; askQuestion("Press any key to continue...", alwaysTrueTest); for (size_t i = 0; i < queueURLS.size(); ++i) { // 7. Poll an SQS queue for its messages. std::vector<Aws::String> messages; std::vector<Aws::String> receiptHandles; while (true) { Aws::SQS::Model::ReceiveMessageRequest request; request.SetMaxNumberOfMessages(10); request.SetQueueUrl(queueURLS[i]); // Setting WaitTimeSeconds to non-zero enables long polling. // For information about long polling, see // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html request.SetWaitTimeSeconds(1); Aws::SQS::Model::ReceiveMessageOutcome outcome = sqsClient.ReceiveMessage(request); if (outcome.IsSuccess()) { const Aws::Vector<Aws::SQS::Model::Message> &newMessages = outcome.GetResult().GetMessages(); if (newMessages.empty()) { break; } else { for (const Aws::SQS::Model::Message &message: newMessages) { messages.push_back(message.GetBody()); receiptHandles.push_back(message.GetReceiptHandle()); } } } else { std::cerr << "Error with SQS::ReceiveMessage. " << outcome.GetError().GetMessage() << std::endl; cleanUp(topicARN, queueURLS, subscriptionARNS, snsClient, sqsClient); return false; } } printAsterisksLine(); if (messages.empty()) { std::cout << "No messages were "; } else if (messages.size() == 1) { std::cout << "One message was "; } else { std::cout << messages.size() << " messages were "; } std::cout << "received by the queue '" << queueNames[i] << "'." << std::endl; for (const Aws::String &message: messages) { std::cout << " Message : '" << message << "'." << std::endl; } // 8. Delete a batch of messages from an SQS queue. if (!receiptHandles.empty()) { Aws::SQS::Model::DeleteMessageBatchRequest request; request.SetQueueUrl(queueURLS[i]); int id = 1; // Ids must be unique within a batch delete request. for (const Aws::String &receiptHandle: receiptHandles) { Aws::SQS::Model::DeleteMessageBatchRequestEntry entry; entry.SetId(std::to_string(id)); ++id; entry.SetReceiptHandle(receiptHandle); request.AddEntries(entry); } Aws::SQS::Model::DeleteMessageBatchOutcome outcome = sqsClient.DeleteMessageBatch(request); if (outcome.IsSuccess()) { std::cout << "The batch deletion of messages was successful." << std::endl; } else { std::cerr << "Error with SQS::DeleteMessageBatch. " << outcome.GetError().GetMessage() << std::endl; cleanUp(topicARN, queueURLS, subscriptionARNS, snsClient, sqsClient); return false; } } } return cleanUp(topicARN, queueURLS, subscriptionARNS, snsClient, sqsClient, true); // askUser } bool AwsDoc::TopicsAndQueues::cleanUp(const Aws::String &topicARN, const Aws::Vector<Aws::String> &queueURLS, const Aws::Vector<Aws::String> &subscriptionARNS, const Aws::SNS::SNSClient &snsClient, const Aws::SQS::SQSClient &sqsClient, bool askUser) { bool result = true; printAsterisksLine(); if (!queueURLS.empty() && askUser && askYesNoQuestion("Delete the SQS queues? (y/n) ")) { for (const auto &queueURL: queueURLS) { // 9. Delete an SQS queue. Aws::SQS::Model::DeleteQueueRequest request; request.SetQueueUrl(queueURL); Aws::SQS::Model::DeleteQueueOutcome outcome = sqsClient.DeleteQueue(request); if (outcome.IsSuccess()) { std::cout << "The queue with URL '" << queueURL << "' was successfully deleted." << std::endl; } else { std::cerr << "Error with SQS::DeleteQueue. " << outcome.GetError().GetMessage() << std::endl; result = false; } } for (const auto &subscriptionARN: subscriptionARNS) { // 10. Unsubscribe an SNS subscription. Aws::SNS::Model::UnsubscribeRequest request; request.SetSubscriptionArn(subscriptionARN); Aws::SNS::Model::UnsubscribeOutcome outcome = snsClient.Unsubscribe(request); if (outcome.IsSuccess()) { std::cout << "Unsubscribe of subscription ARN '" << subscriptionARN << "' was successful." << std::endl; } else { std::cerr << "Error with TopicsAndQueues::Unsubscribe. " << outcome.GetError().GetMessage() << std::endl; result = false; } } } printAsterisksLine(); if (!topicARN.empty() && askUser && askYesNoQuestion("Delete the SNS topic? (y/n) ")) { // 11. Delete an SNS topic. Aws::SNS::Model::DeleteTopicRequest request; request.SetTopicArn(topicARN); Aws::SNS::Model::DeleteTopicOutcome outcome = snsClient.DeleteTopic(request); if (outcome.IsSuccess()) { std::cout << "The topic with ARN '" << topicARN << "' was successfully deleted." << std::endl; } else { std::cerr << "Error with TopicsAndQueues::DeleteTopicRequest. " << outcome.GetError().GetMessage() << std::endl; result = false; } } return result; } //! Create an IAM policy that gives an SQS queue permission to receive messages from an SNS topic. /*! \sa createPolicyForQueue() \param queueARN: The SQS queue Amazon Resource Name (ARN). \param topicARN: The SNS topic ARN. \return Aws::String: The policy as JSON. */ Aws::String AwsDoc::TopicsAndQueues::createPolicyForQueue(const Aws::String &queueARN, const Aws::String &topicARN) { std::ostringstream policyStream; policyStream << R"({ "Statement": [ { "Effect": "Allow", "Principal": { "Service": "sns.amazonaws.com" }, "Action": "sqs:SendMessage", "Resource": ")" << queueARN << R"(", "Condition": { "ArnEquals": { "aws:SourceArn": ")" << topicARN << R"(" } } } ] })"; return policyStream.str(); }
Go
SDK for Go V2
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

Run an interactive scenario at a command prompt.

import ( "context" "encoding/json" "fmt" "log" "strings" "topics_and_queues/actions" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/sns" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/sqs/types" "github.com/awsdocs/aws-doc-sdk-examples/gov2/demotools" ) const FIFO_SUFFIX = ".fifo" const TONE_KEY = "tone" var ToneChoices = []string{"cheerful", "funny", "serious", "sincere"} // MessageBody is used to deserialize the body of a message from a JSON string. type MessageBody struct { Message string } // ScenarioRunner separates the steps of this scenario into individual functions so that // they are simpler to read and understand. type ScenarioRunner struct { questioner demotools.IQuestioner snsActor *actions.SnsActions sqsActor *actions.SqsActions } func (runner ScenarioRunner) CreateTopic(ctx context.Context) (string, string, bool, bool) { log.Println("SNS topics can be configured as FIFO (First-In-First-Out) or standard.\n" + "FIFO topics deliver messages in order and support deduplication and message filtering.") isFifoTopic := runner.questioner.AskBool("\nWould you like to work with FIFO topics? (y/n) ", "y") contentBasedDeduplication := false if isFifoTopic { log.Println(strings.Repeat("-", 88)) log.Println("Because you have chosen a FIFO topic, deduplication is supported.\n" + "Deduplication IDs are either set in the message or are automatically generated\n" + "from content using a hash function. If a message is successfully published to\n" + "an SNS FIFO topic, any message published and determined to have the same\n" + "deduplication ID, within the five-minute deduplication interval, is accepted\n" + "but not delivered. For more information about deduplication, see:\n" + "\thttps://docs.aws.amazon.com/sns/latest/dg/fifo-message-dedup.html.") contentBasedDeduplication = runner.questioner.AskBool( "\nDo you want to use content-based deduplication instead of entering a deduplication ID? (y/n) ", "y") } log.Println(strings.Repeat("-", 88)) topicName := runner.questioner.Ask("Enter a name for your SNS topic. ") if isFifoTopic { topicName = fmt.Sprintf("%v%v", topicName, FIFO_SUFFIX) log.Printf("Because you have selected a FIFO topic, '%v' must be appended to\n"+ "the topic name.", FIFO_SUFFIX) } topicArn, err := runner.snsActor.CreateTopic(ctx, topicName, isFifoTopic, contentBasedDeduplication) if err != nil { panic(err) } log.Printf("Your new topic with the name '%v' and Amazon Resource Name (ARN) \n"+ "'%v' has been created.", topicName, topicArn) return topicName, topicArn, isFifoTopic, contentBasedDeduplication } func (runner ScenarioRunner) CreateQueue(ctx context.Context, ordinal string, isFifoTopic bool) (string, string) { queueName := runner.questioner.Ask(fmt.Sprintf("Enter a name for the %v SQS queue. ", ordinal)) if isFifoTopic { queueName = fmt.Sprintf("%v%v", queueName, FIFO_SUFFIX) if ordinal == "first" { log.Printf("Because you are creating a FIFO SQS queue, '%v' must "+ "be appended to the queue name.\n", FIFO_SUFFIX) } } queueUrl, err := runner.sqsActor.CreateQueue(ctx, queueName, isFifoTopic) if err != nil { panic(err) } log.Printf("Your new SQS queue with the name '%v' and the queue URL "+ "'%v' has been created.", queueName, queueUrl) return queueName, queueUrl } func (runner ScenarioRunner) SubscribeQueueToTopic( ctx context.Context, queueName string, queueUrl string, topicName string, topicArn string, ordinal string, isFifoTopic bool) (string, bool) { queueArn, err := runner.sqsActor.GetQueueArn(ctx, queueUrl) if err != nil { panic(err) } log.Printf("The ARN of your queue is: %v.\n", queueArn) err = runner.sqsActor.AttachSendMessagePolicy(ctx, queueUrl, queueArn, topicArn) if err != nil { panic(err) } log.Println("Attached an IAM policy to the queue so the SNS topic can send " + "messages to it.") log.Println(strings.Repeat("-", 88)) var filterPolicy map[string][]string if isFifoTopic { if ordinal == "first" { log.Println("Subscriptions to a FIFO topic can have filters.\n" + "If you add a filter to this subscription, then only the filtered messages\n" + "will be received in the queue.\n" + "For information about message filtering, see\n" + "\thttps://docs.aws.amazon.com/sns/latest/dg/sns-message-filtering.html\n" + "For this example, you can filter messages by a \"tone\" attribute.") } wantFiltering := runner.questioner.AskBool( fmt.Sprintf("Do you want to filter messages that are sent to \"%v\"\n"+ "from the %v topic? (y/n) ", queueName, topicName), "y") if wantFiltering { log.Println("You can filter messages by one or more of the following \"tone\" attributes.") var toneSelections []string askAboutTones := true for askAboutTones { toneIndex := runner.questioner.AskChoice( "Enter the number of the tone you want to filter by:\n", ToneChoices) toneSelections = append(toneSelections, ToneChoices[toneIndex]) askAboutTones = runner.questioner.AskBool("Do you want to add another tone to the filter? (y/n) ", "y") } log.Printf("Your subscription will be filtered to only pass the following tones: %v\n", toneSelections) filterPolicy = map[string][]string{TONE_KEY: toneSelections} } } subscriptionArn, err := runner.snsActor.SubscribeQueue(ctx, topicArn, queueArn, filterPolicy) if err != nil { panic(err) } log.Printf("The queue %v is now subscribed to the topic %v with the subscription ARN %v.\n", queueName, topicName, subscriptionArn) return subscriptionArn, filterPolicy != nil } func (runner ScenarioRunner) PublishMessages(ctx context.Context, topicArn string, isFifoTopic bool, contentBasedDeduplication bool, usingFilters bool) { var message string var groupId string var dedupId string var toneSelection string publishMore := true for publishMore { groupId = "" dedupId = "" toneSelection = "" message = runner.questioner.Ask("Enter a message to publish: ") if isFifoTopic { log.Println("Because you are using a FIFO topic, you must set a message group ID.\n" + "All messages within the same group will be received in the order they were published.") groupId = runner.questioner.Ask("Enter a message group ID: ") if !contentBasedDeduplication { log.Println("Because you are not using content-based deduplication,\n" + "you must enter a deduplication ID.") dedupId = runner.questioner.Ask("Enter a deduplication ID: ") } } if usingFilters { if runner.questioner.AskBool("Add a tone attribute so this message can be filtered? (y/n) ", "y") { toneIndex := runner.questioner.AskChoice( "Enter the number of the tone you want to filter by:\n", ToneChoices) toneSelection = ToneChoices[toneIndex] } } err := runner.snsActor.Publish(ctx, topicArn, message, groupId, dedupId, TONE_KEY, toneSelection) if err != nil { panic(err) } log.Println(("Your message was published.")) publishMore = runner.questioner.AskBool("Do you want to publish another messsage? (y/n) ", "y") } } func (runner ScenarioRunner) PollForMessages(ctx context.Context, queueUrls []string) { log.Println("Polling queues for messages...") for _, queueUrl := range queueUrls { var messages []types.Message for { currentMsgs, err := runner.sqsActor.GetMessages(ctx, queueUrl, 10, 1) if err != nil { panic(err) } if len(currentMsgs) == 0 { break } messages = append(messages, currentMsgs...) } if len(messages) == 0 { log.Printf("No messages were received by queue %v.\n", queueUrl) } else if len(messages) == 1 { log.Printf("One message was received by queue %v:\n", queueUrl) } else { log.Printf("%v messages were received by queue %v:\n", len(messages), queueUrl) } for msgIndex, message := range messages { messageBody := MessageBody{} err := json.Unmarshal([]byte(*message.Body), &messageBody) if err != nil { panic(err) } log.Printf("Message %v: %v\n", msgIndex+1, messageBody.Message) } if len(messages) > 0 { log.Printf("Deleting %v messages from queue %v.\n", len(messages), queueUrl) err := runner.sqsActor.DeleteMessages(ctx, queueUrl, messages) if err != nil { panic(err) } } } } // RunTopicsAndQueuesScenario is an interactive example that shows you how to use the // AWS SDK for Go to create and use Amazon SNS topics and Amazon SQS queues. // // 1. Create a topic (FIFO or non-FIFO). // 2. Subscribe several queues to the topic with an option to apply a filter. // 3. Publish messages to the topic. // 4. Poll the queues for messages received. // 5. Delete the topic and the queues. // // This example creates service clients from the specified sdkConfig so that // you can replace it with a mocked or stubbed config for unit testing. // // It uses a questioner from the `demotools` package to get input during the example. // This package can be found in the ..\..\demotools folder of this repo. func RunTopicsAndQueuesScenario( ctx context.Context, sdkConfig aws.Config, questioner demotools.IQuestioner) { resources := Resources{} defer func() { if r := recover(); r != nil { log.Println("Something went wrong with the demo.\n" + "Cleaning up any resources that were created...") resources.Cleanup(ctx) } }() queueCount := 2 log.Println(strings.Repeat("-", 88)) log.Printf("Welcome to messaging with topics and queues.\n\n"+ "In this workflow, you will create an SNS topic and subscribe %v SQS queues to the\n"+ "topic. You can select from several options for configuring the topic and the\n"+ "subscriptions for the queues. You can then post to the topic and see the results\n"+ "in the queues.\n", queueCount) log.Println(strings.Repeat("-", 88)) runner := ScenarioRunner{ questioner: questioner, snsActor: &actions.SnsActions{SnsClient: sns.NewFromConfig(sdkConfig)}, sqsActor: &actions.SqsActions{SqsClient: sqs.NewFromConfig(sdkConfig)}, } resources.snsActor = runner.snsActor resources.sqsActor = runner.sqsActor topicName, topicArn, isFifoTopic, contentBasedDeduplication := runner.CreateTopic(ctx) resources.topicArn = topicArn log.Println(strings.Repeat("-", 88)) log.Printf("Now you will create %v SQS queues and subscribe them to the topic.\n", queueCount) ordinals := []string{"first", "next"} usingFilters := false for _, ordinal := range ordinals { queueName, queueUrl := runner.CreateQueue(ctx, ordinal, isFifoTopic) resources.queueUrls = append(resources.queueUrls, queueUrl) _, filtering := runner.SubscribeQueueToTopic(ctx, queueName, queueUrl, topicName, topicArn, ordinal, isFifoTopic) usingFilters = usingFilters || filtering } log.Println(strings.Repeat("-", 88)) runner.PublishMessages(ctx, topicArn, isFifoTopic, contentBasedDeduplication, usingFilters) log.Println(strings.Repeat("-", 88)) runner.PollForMessages(ctx, resources.queueUrls) log.Println(strings.Repeat("-", 88)) wantCleanup := questioner.AskBool("Do you want to remove all AWS resources created for this scenario? (y/n) ", "y") if wantCleanup { log.Println("Cleaning up resources...") resources.Cleanup(ctx) } log.Println(strings.Repeat("-", 88)) log.Println("Thanks for watching!") log.Println(strings.Repeat("-", 88)) }

Define a struct that wraps Amazon SNS actions used in this example.

import ( "context" "encoding/json" "log" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/sns" "github.com/aws/aws-sdk-go-v2/service/sns/types" ) // SnsActions encapsulates the Amazon Simple Notification Service (Amazon SNS) actions // used in the examples. type SnsActions struct { SnsClient *sns.Client } // CreateTopic creates an Amazon SNS topic with the specified name. You can optionally // specify that the topic is created as a FIFO topic and whether it uses content-based // deduplication instead of ID-based deduplication. func (actor SnsActions) CreateTopic(ctx context.Context, topicName string, isFifoTopic bool, contentBasedDeduplication bool) (string, error) { var topicArn string topicAttributes := map[string]string{} if isFifoTopic { topicAttributes["FifoTopic"] = "true" } if contentBasedDeduplication { topicAttributes["ContentBasedDeduplication"] = "true" } topic, err := actor.SnsClient.CreateTopic(ctx, &sns.CreateTopicInput{ Name: aws.String(topicName), Attributes: topicAttributes, }) if err != nil { log.Printf("Couldn't create topic %v. Here's why: %v\n", topicName, err) } else { topicArn = *topic.TopicArn } return topicArn, err } // DeleteTopic delete an Amazon SNS topic. func (actor SnsActions) DeleteTopic(ctx context.Context, topicArn string) error { _, err := actor.SnsClient.DeleteTopic(ctx, &sns.DeleteTopicInput{ TopicArn: aws.String(topicArn)}) if err != nil { log.Printf("Couldn't delete topic %v. Here's why: %v\n", topicArn, err) } return err } // SubscribeQueue subscribes an Amazon Simple Queue Service (Amazon SQS) queue to an // Amazon SNS topic. When filterMap is not nil, it is used to specify a filter policy // so that messages are only sent to the queue when the message has the specified attributes. func (actor SnsActions) SubscribeQueue(ctx context.Context, topicArn string, queueArn string, filterMap map[string][]string) (string, error) { var subscriptionArn string var attributes map[string]string if filterMap != nil { filterBytes, err := json.Marshal(filterMap) if err != nil { log.Printf("Couldn't create filter policy, here's why: %v\n", err) return "", err } attributes = map[string]string{"FilterPolicy": string(filterBytes)} } output, err := actor.SnsClient.Subscribe(ctx, &sns.SubscribeInput{ Protocol: aws.String("sqs"), TopicArn: aws.String(topicArn), Attributes: attributes, Endpoint: aws.String(queueArn), ReturnSubscriptionArn: true, }) if err != nil { log.Printf("Couldn't susbscribe queue %v to topic %v. Here's why: %v\n", queueArn, topicArn, err) } else { subscriptionArn = *output.SubscriptionArn } return subscriptionArn, err } // Publish publishes a message to an Amazon SNS topic. The message is then sent to all // subscribers. When the topic is a FIFO topic, the message must also contain a group ID // and, when ID-based deduplication is used, a deduplication ID. An optional key-value // filter attribute can be specified so that the message can be filtered according to // a filter policy. func (actor SnsActions) Publish(ctx context.Context, topicArn string, message string, groupId string, dedupId string, filterKey string, filterValue string) error { publishInput := sns.PublishInput{TopicArn: aws.String(topicArn), Message: aws.String(message)} if groupId != "" { publishInput.MessageGroupId = aws.String(groupId) } if dedupId != "" { publishInput.MessageDeduplicationId = aws.String(dedupId) } if filterKey != "" && filterValue != "" { publishInput.MessageAttributes = map[string]types.MessageAttributeValue{ filterKey: {DataType: aws.String("String"), StringValue: aws.String(filterValue)}, } } _, err := actor.SnsClient.Publish(ctx, &publishInput) if err != nil { log.Printf("Couldn't publish message to topic %v. Here's why: %v", topicArn, err) } return err }

Define a struct that wraps Amazon SQS actions used in this example.

import ( "context" "encoding/json" "fmt" "log" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/sqs/types" ) // SqsActions encapsulates the Amazon Simple Queue Service (Amazon SQS) actions // used in the examples. type SqsActions struct { SqsClient *sqs.Client } // CreateQueue creates an Amazon SQS queue with the specified name. You can specify // whether the queue is created as a FIFO queue. func (actor SqsActions) CreateQueue(ctx context.Context, queueName string, isFifoQueue bool) (string, error) { var queueUrl string queueAttributes := map[string]string{} if isFifoQueue { queueAttributes["FifoQueue"] = "true" } queue, err := actor.SqsClient.CreateQueue(ctx, &sqs.CreateQueueInput{ QueueName: aws.String(queueName), Attributes: queueAttributes, }) if err != nil { log.Printf("Couldn't create queue %v. Here's why: %v\n", queueName, err) } else { queueUrl = *queue.QueueUrl } return queueUrl, err } // GetQueueArn uses the GetQueueAttributes action to get the Amazon Resource Name (ARN) // of an Amazon SQS queue. func (actor SqsActions) GetQueueArn(ctx context.Context, queueUrl string) (string, error) { var queueArn string arnAttributeName := types.QueueAttributeNameQueueArn attribute, err := actor.SqsClient.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{ QueueUrl: aws.String(queueUrl), AttributeNames: []types.QueueAttributeName{arnAttributeName}, }) if err != nil { log.Printf("Couldn't get ARN for queue %v. Here's why: %v\n", queueUrl, err) } else { queueArn = attribute.Attributes[string(arnAttributeName)] } return queueArn, err } // AttachSendMessagePolicy uses the SetQueueAttributes action to attach a policy to an // Amazon SQS queue that allows the specified Amazon SNS topic to send messages to the // queue. func (actor SqsActions) AttachSendMessagePolicy(ctx context.Context, queueUrl string, queueArn string, topicArn string) error { policyDoc := PolicyDocument{ Version: "2012-10-17", Statement: []PolicyStatement{{ Effect: "Allow", Action: "sqs:SendMessage", Principal: map[string]string{"Service": "sns.amazonaws.com"}, Resource: aws.String(queueArn), Condition: PolicyCondition{"ArnEquals": map[string]string{"aws:SourceArn": topicArn}}, }}, } policyBytes, err := json.Marshal(policyDoc) if err != nil { log.Printf("Couldn't create policy document. Here's why: %v\n", err) return err } _, err = actor.SqsClient.SetQueueAttributes(ctx, &sqs.SetQueueAttributesInput{ Attributes: map[string]string{ string(types.QueueAttributeNamePolicy): string(policyBytes), }, QueueUrl: aws.String(queueUrl), }) if err != nil { log.Printf("Couldn't set send message policy on queue %v. Here's why: %v\n", queueUrl, err) } return err } // PolicyDocument defines a policy document as a Go struct that can be serialized // to JSON. type PolicyDocument struct { Version string Statement []PolicyStatement } // PolicyStatement defines a statement in a policy document. type PolicyStatement struct { Effect string Action string Principal map[string]string `json:",omitempty"` Resource *string `json:",omitempty"` Condition PolicyCondition `json:",omitempty"` } // PolicyCondition defines a condition in a policy. type PolicyCondition map[string]map[string]string // GetMessages uses the ReceiveMessage action to get messages from an Amazon SQS queue. func (actor SqsActions) GetMessages(ctx context.Context, queueUrl string, maxMessages int32, waitTime int32) ([]types.Message, error) { var messages []types.Message result, err := actor.SqsClient.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{ QueueUrl: aws.String(queueUrl), MaxNumberOfMessages: maxMessages, WaitTimeSeconds: waitTime, }) if err != nil { log.Printf("Couldn't get messages from queue %v. Here's why: %v\n", queueUrl, err) } else { messages = result.Messages } return messages, err } // DeleteMessages uses the DeleteMessageBatch action to delete a batch of messages from // an Amazon SQS queue. func (actor SqsActions) DeleteMessages(ctx context.Context, queueUrl string, messages []types.Message) error { entries := make([]types.DeleteMessageBatchRequestEntry, len(messages)) for msgIndex := range messages { entries[msgIndex].Id = aws.String(fmt.Sprintf("%v", msgIndex)) entries[msgIndex].ReceiptHandle = messages[msgIndex].ReceiptHandle } _, err := actor.SqsClient.DeleteMessageBatch(ctx, &sqs.DeleteMessageBatchInput{ Entries: entries, QueueUrl: aws.String(queueUrl), }) if err != nil { log.Printf("Couldn't delete messages from queue %v. Here's why: %v\n", queueUrl, err) } return err } // DeleteQueue deletes an Amazon SQS queue. func (actor SqsActions) DeleteQueue(ctx context.Context, queueUrl string) error { _, err := actor.SqsClient.DeleteQueue(ctx, &sqs.DeleteQueueInput{ QueueUrl: aws.String(queueUrl)}) if err != nil { log.Printf("Couldn't delete queue %v. Here's why: %v\n", queueUrl, err) } return err }

Clean up resources.

import ( "context" "fmt" "log" "topics_and_queues/actions" ) // Resources keeps track of AWS resources created during an example and handles // cleanup when the example finishes. type Resources struct { topicArn string queueUrls []string snsActor *actions.SnsActions sqsActor *actions.SqsActions } // Cleanup deletes all AWS resources created during an example. func (resources Resources) Cleanup(ctx context.Context) { defer func() { if r := recover(); r != nil { fmt.Println("Something went wrong during cleanup. Use the AWS Management Console\n" + "to remove any remaining resources that were created for this scenario.") } }() var err error if resources.topicArn != "" { log.Printf("Deleting topic %v.\n", resources.topicArn) err = resources.snsActor.DeleteTopic(ctx, resources.topicArn) if err != nil { panic(err) } } for _, queueUrl := range resources.queueUrls { log.Printf("Deleting queue %v.\n", queueUrl) err = resources.sqsActor.DeleteQueue(ctx, queueUrl) if err != nil { panic(err) } } }
Java
SDK for Java 2.x
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

package com.example.sns; import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sns.SnsClient; import software.amazon.awssdk.services.sns.model.CreateTopicRequest; import software.amazon.awssdk.services.sns.model.CreateTopicResponse; import software.amazon.awssdk.services.sns.model.DeleteTopicRequest; import software.amazon.awssdk.services.sns.model.DeleteTopicResponse; import software.amazon.awssdk.services.sns.model.MessageAttributeValue; import software.amazon.awssdk.services.sns.model.PublishRequest; import software.amazon.awssdk.services.sns.model.PublishResponse; import software.amazon.awssdk.services.sns.model.SetSubscriptionAttributesRequest; import software.amazon.awssdk.services.sns.model.SnsException; import software.amazon.awssdk.services.sns.model.SubscribeRequest; import software.amazon.awssdk.services.sns.model.SubscribeResponse; import software.amazon.awssdk.services.sns.model.UnsubscribeRequest; import software.amazon.awssdk.services.sns.model.UnsubscribeResponse; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; import software.amazon.awssdk.services.sqs.model.DeleteQueueRequest; import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest; import software.amazon.awssdk.services.sqs.model.GetQueueAttributesResponse; import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest; import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse; import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.QueueAttributeName; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; import software.amazon.awssdk.services.sqs.model.SetQueueAttributesRequest; import software.amazon.awssdk.services.sqs.model.SqsException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Scanner; import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonObject; import com.google.gson.JsonPrimitive; /** * Before running this Java V2 code example, set up your development * environment, including your credentials. * * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html * * This Java example performs these tasks: * * 1. Gives the user three options to choose from. * 2. Creates an Amazon Simple Notification Service (Amazon SNS) topic. * 3. Creates an Amazon Simple Queue Service (Amazon SQS) queue. * 4. Gets the SQS queue Amazon Resource Name (ARN) attribute. * 5. Attaches an AWS Identity and Access Management (IAM) policy to the queue. * 6. Subscribes to the SQS queue. * 7. Publishes a message to the topic. * 8. Displays the messages. * 9. Deletes the received message. * 10. Unsubscribes from the topic. * 11. Deletes the SNS topic. */ public class SNSWorkflow { public static final String DASHES = new String(new char[80]).replace("\0", "-"); public static void main(String[] args) { final String usage = "\n" + "Usage:\n" + " <fifoQueueARN>\n\n" + "Where:\n" + " accountId - Your AWS account Id value."; // if (args.length != 1) { // System.out.println(usage); // System.exit(1); // } SnsClient snsClient = SnsClient.builder() .region(Region.US_EAST_1) .credentialsProvider(EnvironmentVariableCredentialsProvider.create()) .build(); SqsClient sqsClient = SqsClient.builder() .region(Region.US_EAST_1) .credentialsProvider(EnvironmentVariableCredentialsProvider.create()) .build(); Scanner in = new Scanner(System.in); String accountId = "814548047983"; String useFIFO; String duplication = "n"; String topicName; String deduplicationID = null; String groupId = null; String topicArn; String sqsQueueName; String sqsQueueUrl; String sqsQueueArn; String subscriptionArn; boolean selectFIFO = false; String message; List<Message> messageList; List<String> filterList = new ArrayList<>(); String msgAttValue = ""; System.out.println(DASHES); System.out.println("Welcome to messaging with topics and queues."); System.out.println("In this workflow, you will create an SNS topic and subscribe an SQS queue to the topic.\n" + "You can select from several options for configuring the topic and the subscriptions for the queue.\n" + "You can then post to the topic and see the results in the queue."); System.out.println(DASHES); System.out.println(DASHES); System.out.println("SNS topics can be configured as FIFO (First-In-First-Out).\n" + "FIFO topics deliver messages in order and support deduplication and message filtering.\n" + "Would you like to work with FIFO topics? (y/n)"); useFIFO = in.nextLine(); if (useFIFO.compareTo("y") == 0) { selectFIFO = true; System.out.println("You have selected FIFO"); System.out.println(" Because you have chosen a FIFO topic, deduplication is supported.\n" + " Deduplication IDs are either set in the message or automatically generated from content using a hash function.\n" + " If a message is successfully published to an SNS FIFO topic, any message published and determined to have the same deduplication ID,\n" + " within the five-minute deduplication interval, is accepted but not delivered.\n" + " For more information about deduplication, see https://docs.aws.amazon.com/sns/latest/dg/fifo-message-dedup.html."); System.out.println( "Would you like to use content-based deduplication instead of entering a deduplication ID? (y/n)"); duplication = in.nextLine(); if (duplication.compareTo("y") == 0) { System.out.println("Please enter a group id value"); groupId = in.nextLine(); } else { System.out.println("Please enter deduplication Id value"); deduplicationID = in.nextLine(); System.out.println("Please enter a group id value"); groupId = in.nextLine(); } } System.out.println(DASHES); System.out.println(DASHES); System.out.println("2. Create a topic."); System.out.println("Enter a name for your SNS topic."); topicName = in.nextLine(); if (selectFIFO) { System.out.println("Because you have selected a FIFO topic, '.fifo' must be appended to the topic name."); topicName = topicName + ".fifo"; System.out.println("The name of the topic is " + topicName); topicArn = createFIFO(snsClient, topicName, duplication); System.out.println("The ARN of the FIFO topic is " + topicArn); } else { System.out.println("The name of the topic is " + topicName); topicArn = createSNSTopic(snsClient, topicName); System.out.println("The ARN of the non-FIFO topic is " + topicArn); } System.out.println(DASHES); System.out.println(DASHES); System.out.println("3. Create an SQS queue."); System.out.println("Enter a name for your SQS queue."); sqsQueueName = in.nextLine(); if (selectFIFO) { sqsQueueName = sqsQueueName + ".fifo"; } sqsQueueUrl = createQueue(sqsClient, sqsQueueName, selectFIFO); System.out.println("The queue URL is " + sqsQueueUrl); System.out.println(DASHES); System.out.println(DASHES); System.out.println("4. Get the SQS queue ARN attribute."); sqsQueueArn = getSQSQueueAttrs(sqsClient, sqsQueueUrl); System.out.println("The ARN of the new queue is " + sqsQueueArn); System.out.println(DASHES); System.out.println(DASHES); System.out.println("5. Attach an IAM policy to the queue."); // Define the policy to use. Make sure that you change the REGION if you are // running this code // in a different region. String policy = "{\n" + " \"Statement\": [\n" + " {\n" + " \"Effect\": \"Allow\",\n" + " \"Principal\": {\n" + " \"Service\": \"sns.amazonaws.com\"\n" + " },\n" + " \"Action\": \"sqs:SendMessage\",\n" + " \"Resource\": \"arn:aws:sqs:us-east-1:" + accountId + ":" + sqsQueueName + "\",\n" + " \"Condition\": {\n" + " \"ArnEquals\": {\n" + " \"aws:SourceArn\": \"arn:aws:sns:us-east-1:" + accountId + ":" + topicName + "\"\n" + " }\n" + " }\n" + " }\n" + " ]\n" + " }"; setQueueAttr(sqsClient, sqsQueueUrl, policy); System.out.println(DASHES); System.out.println(DASHES); System.out.println("6. Subscribe to the SQS queue."); if (selectFIFO) { System.out.println( "If you add a filter to this subscription, then only the filtered messages will be received in the queue.\n" + "For information about message filtering, see https://docs.aws.amazon.com/sns/latest/dg/sns-message-filtering.html\n" + "For this example, you can filter messages by a \"tone\" attribute."); System.out.println("Would you like to filter messages for " + sqsQueueName + "'s subscription to the topic " + topicName + "? (y/n)"); String filterAns = in.nextLine(); if (filterAns.compareTo("y") == 0) { boolean moreAns = false; System.out.println("You can filter messages by one or more of the following \"tone\" attributes."); System.out.println("1. cheerful"); System.out.println("2. funny"); System.out.println("3. serious"); System.out.println("4. sincere"); while (!moreAns) { System.out.println("Select a number or choose 0 to end."); String ans = in.nextLine(); switch (ans) { case "1": filterList.add("cheerful"); break; case "2": filterList.add("funny"); break; case "3": filterList.add("serious"); break; case "4": filterList.add("sincere"); break; default: moreAns = true; break; } } } } subscriptionArn = subQueue(snsClient, topicArn, sqsQueueArn, filterList); System.out.println(DASHES); System.out.println(DASHES); System.out.println("7. Publish a message to the topic."); if (selectFIFO) { System.out.println("Would you like to add an attribute to this message? (y/n)"); String msgAns = in.nextLine(); if (msgAns.compareTo("y") == 0) { System.out.println("You can filter messages by one or more of the following \"tone\" attributes."); System.out.println("1. cheerful"); System.out.println("2. funny"); System.out.println("3. serious"); System.out.println("4. sincere"); System.out.println("Select a number or choose 0 to end."); String ans = in.nextLine(); switch (ans) { case "1": msgAttValue = "cheerful"; break; case "2": msgAttValue = "funny"; break; case "3": msgAttValue = "serious"; break; default: msgAttValue = "sincere"; break; } System.out.println("Selected value is " + msgAttValue); } System.out.println("Enter a message."); message = in.nextLine(); pubMessageFIFO(snsClient, message, topicArn, msgAttValue, duplication, groupId, deduplicationID); } else { System.out.println("Enter a message."); message = in.nextLine(); pubMessage(snsClient, message, topicArn); } System.out.println(DASHES); System.out.println(DASHES); System.out.println("8. Display the message. Press any key to continue."); in.nextLine(); messageList = receiveMessages(sqsClient, sqsQueueUrl, msgAttValue); for (Message mes : messageList) { System.out.println("Message Id: " + mes.messageId()); System.out.println("Full Message: " + mes.body()); } System.out.println(DASHES); System.out.println(DASHES); System.out.println("9. Delete the received message. Press any key to continue."); in.nextLine(); deleteMessages(sqsClient, sqsQueueUrl, messageList); System.out.println(DASHES); System.out.println(DASHES); System.out.println("10. Unsubscribe from the topic and delete the queue. Press any key to continue."); in.nextLine(); unSub(snsClient, subscriptionArn); deleteSQSQueue(sqsClient, sqsQueueName); System.out.println(DASHES); System.out.println(DASHES); System.out.println("11. Delete the topic. Press any key to continue."); in.nextLine(); deleteSNSTopic(snsClient, topicArn); System.out.println(DASHES); System.out.println("The SNS/SQS workflow has completed successfully."); System.out.println(DASHES); } public static void deleteSNSTopic(SnsClient snsClient, String topicArn) { try { DeleteTopicRequest request = DeleteTopicRequest.builder() .topicArn(topicArn) .build(); DeleteTopicResponse result = snsClient.deleteTopic(request); System.out.println("Status was " + result.sdkHttpResponse().statusCode()); } catch (SnsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void deleteSQSQueue(SqsClient sqsClient, String queueName) { try { GetQueueUrlRequest getQueueRequest = GetQueueUrlRequest.builder() .queueName(queueName) .build(); String queueUrl = sqsClient.getQueueUrl(getQueueRequest).queueUrl(); DeleteQueueRequest deleteQueueRequest = DeleteQueueRequest.builder() .queueUrl(queueUrl) .build(); sqsClient.deleteQueue(deleteQueueRequest); System.out.println(queueName + " was successfully deleted."); } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void unSub(SnsClient snsClient, String subscriptionArn) { try { UnsubscribeRequest request = UnsubscribeRequest.builder() .subscriptionArn(subscriptionArn) .build(); UnsubscribeResponse result = snsClient.unsubscribe(request); System.out.println("Status was " + result.sdkHttpResponse().statusCode() + "\nSubscription was removed for " + request.subscriptionArn()); } catch (SnsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void deleteMessages(SqsClient sqsClient, String queueUrl, List<Message> messages) { try { List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>(); for (Message msg : messages) { DeleteMessageBatchRequestEntry entry = DeleteMessageBatchRequestEntry.builder() .id(msg.messageId()) .build(); entries.add(entry); } DeleteMessageBatchRequest deleteMessageBatchRequest = DeleteMessageBatchRequest.builder() .queueUrl(queueUrl) .entries(entries) .build(); sqsClient.deleteMessageBatch(deleteMessageBatchRequest); System.out.println("The batch delete of messages was successful"); } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static List<Message> receiveMessages(SqsClient sqsClient, String queueUrl, String msgAttValue) { try { if (msgAttValue.isEmpty()) { ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages(5) .build(); return sqsClient.receiveMessage(receiveMessageRequest).messages(); } else { // We know there are filters on the message. ReceiveMessageRequest receiveRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .messageAttributeNames(msgAttValue) // Include other message attributes if needed. .maxNumberOfMessages(5) .build(); return sqsClient.receiveMessage(receiveRequest).messages(); } } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } return null; } public static void pubMessage(SnsClient snsClient, String message, String topicArn) { try { PublishRequest request = PublishRequest.builder() .message(message) .topicArn(topicArn) .build(); PublishResponse result = snsClient.publish(request); System.out .println(result.messageId() + " Message sent. Status is " + result.sdkHttpResponse().statusCode()); } catch (SnsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void pubMessageFIFO(SnsClient snsClient, String message, String topicArn, String msgAttValue, String duplication, String groupId, String deduplicationID) { try { PublishRequest request; // Means the user did not choose to use a message attribute. if (msgAttValue.isEmpty()) { if (duplication.compareTo("y") == 0) { request = PublishRequest.builder() .message(message) .messageGroupId(groupId) .topicArn(topicArn) .build(); } else { request = PublishRequest.builder() .message(message) .messageDeduplicationId(deduplicationID) .messageGroupId(groupId) .topicArn(topicArn) .build(); } } else { Map<String, MessageAttributeValue> messageAttributes = new HashMap<>(); messageAttributes.put(msgAttValue, MessageAttributeValue.builder() .dataType("String") .stringValue("true") .build()); if (duplication.compareTo("y") == 0) { request = PublishRequest.builder() .message(message) .messageGroupId(groupId) .topicArn(topicArn) .build(); } else { // Create a publish request with the message and attributes. request = PublishRequest.builder() .topicArn(topicArn) .message(message) .messageDeduplicationId(deduplicationID) .messageGroupId(groupId) .messageAttributes(messageAttributes) .build(); } } // Publish the message to the topic. PublishResponse result = snsClient.publish(request); System.out .println(result.messageId() + " Message sent. Status was " + result.sdkHttpResponse().statusCode()); } catch (SnsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } // Subscribe to the SQS queue. public static String subQueue(SnsClient snsClient, String topicArn, String queueArn, List<String> filterList) { try { SubscribeRequest request; if (filterList.isEmpty()) { // No filter subscription is added. request = SubscribeRequest.builder() .protocol("sqs") .endpoint(queueArn) .returnSubscriptionArn(true) .topicArn(topicArn) .build(); SubscribeResponse result = snsClient.subscribe(request); System.out.println("The queue " + queueArn + " has been subscribed to the topic " + topicArn + "\n" + "with the subscription ARN " + result.subscriptionArn()); return result.subscriptionArn(); } else { request = SubscribeRequest.builder() .protocol("sqs") .endpoint(queueArn) .returnSubscriptionArn(true) .topicArn(topicArn) .build(); SubscribeResponse result = snsClient.subscribe(request); System.out.println("The queue " + queueArn + " has been subscribed to the topic " + topicArn + "\n" + "with the subscription ARN " + result.subscriptionArn()); String attributeName = "FilterPolicy"; Gson gson = new Gson(); String jsonString = "{\"tone\": []}"; JsonObject jsonObject = gson.fromJson(jsonString, JsonObject.class); JsonArray toneArray = jsonObject.getAsJsonArray("tone"); for (String value : filterList) { toneArray.add(new JsonPrimitive(value)); } String updatedJsonString = gson.toJson(jsonObject); System.out.println(updatedJsonString); SetSubscriptionAttributesRequest attRequest = SetSubscriptionAttributesRequest.builder() .subscriptionArn(result.subscriptionArn()) .attributeName(attributeName) .attributeValue(updatedJsonString) .build(); snsClient.setSubscriptionAttributes(attRequest); return result.subscriptionArn(); } } catch (SnsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } return ""; } // Attach a policy to the queue. public static void setQueueAttr(SqsClient sqsClient, String queueUrl, String policy) { try { Map<software.amazon.awssdk.services.sqs.model.QueueAttributeName, String> attrMap = new HashMap<>(); attrMap.put(QueueAttributeName.POLICY, policy); SetQueueAttributesRequest attributesRequest = SetQueueAttributesRequest.builder() .queueUrl(queueUrl) .attributes(attrMap) .build(); sqsClient.setQueueAttributes(attributesRequest); System.out.println("The policy has been successfully attached."); } catch (SnsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static String getSQSQueueAttrs(SqsClient sqsClient, String queueUrl) { // Specify the attributes to retrieve. List<QueueAttributeName> atts = new ArrayList<>(); atts.add(QueueAttributeName.QUEUE_ARN); GetQueueAttributesRequest attributesRequest = GetQueueAttributesRequest.builder() .queueUrl(queueUrl) .attributeNames(atts) .build(); GetQueueAttributesResponse response = sqsClient.getQueueAttributes(attributesRequest); Map<String, String> queueAtts = response.attributesAsStrings(); for (Map.Entry<String, String> queueAtt : queueAtts.entrySet()) return queueAtt.getValue(); return ""; } public static String createQueue(SqsClient sqsClient, String queueName, Boolean selectFIFO) { try { System.out.println("\nCreate Queue"); if (selectFIFO) { Map<QueueAttributeName, String> attrs = new HashMap<>(); attrs.put(QueueAttributeName.FIFO_QUEUE, "true"); CreateQueueRequest createQueueRequest = CreateQueueRequest.builder() .queueName(queueName) .attributes(attrs) .build(); sqsClient.createQueue(createQueueRequest); System.out.println("\nGet queue url"); GetQueueUrlResponse getQueueUrlResponse = sqsClient .getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build()); return getQueueUrlResponse.queueUrl(); } else { CreateQueueRequest createQueueRequest = CreateQueueRequest.builder() .queueName(queueName) .build(); sqsClient.createQueue(createQueueRequest); System.out.println("\nGet queue url"); GetQueueUrlResponse getQueueUrlResponse = sqsClient .getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build()); return getQueueUrlResponse.queueUrl(); } } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } return ""; } public static String createSNSTopic(SnsClient snsClient, String topicName) { CreateTopicResponse result; try { CreateTopicRequest request = CreateTopicRequest.builder() .name(topicName) .build(); result = snsClient.createTopic(request); return result.topicArn(); } catch (SnsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } return ""; } public static String createFIFO(SnsClient snsClient, String topicName, String duplication) { try { // Create a FIFO topic by using the SNS service client. Map<String, String> topicAttributes = new HashMap<>(); if (duplication.compareTo("n") == 0) { topicAttributes.put("FifoTopic", "true"); topicAttributes.put("ContentBasedDeduplication", "false"); } else { topicAttributes.put("FifoTopic", "true"); topicAttributes.put("ContentBasedDeduplication", "true"); } CreateTopicRequest topicRequest = CreateTopicRequest.builder() .name(topicName) .attributes(topicAttributes) .build(); CreateTopicResponse response = snsClient.createTopic(topicRequest); return response.topicArn(); } catch (SnsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } return ""; } }
JavaScript
SDK for JavaScript (v3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

This is the entry point for this workflow.

import { SNSClient } from "@aws-sdk/client-sns"; import { SQSClient } from "@aws-sdk/client-sqs"; import { TopicsQueuesWkflw } from "./TopicsQueuesWkflw.js"; import { Prompter } from "@aws-doc-sdk-examples/lib/prompter.js"; export const startSnsWorkflow = () => { const snsClient = new SNSClient({}); const sqsClient = new SQSClient({}); const prompter = new Prompter(); const logger = console; const wkflw = new TopicsQueuesWkflw(snsClient, sqsClient, prompter, logger); wkflw.start(); };

The preceding code provides the necessary dependencies and starts the workflow. The next section contains the bulk of the example.

const toneChoices = [ { name: "cheerful", value: "cheerful" }, { name: "funny", value: "funny" }, { name: "serious", value: "serious" }, { name: "sincere", value: "sincere" }, ]; export class TopicsQueuesWkflw { // SNS topic is configured as First-In-First-Out isFifo = true; // Automatic content-based deduplication is enabled. autoDedup = false; snsClient; sqsClient; topicName; topicArn; subscriptionArns = []; /** * @type {{ queueName: string, queueArn: string, queueUrl: string, policy?: string }[]} */ queues = []; prompter; /** * @param {import('@aws-sdk/client-sns').SNSClient} snsClient * @param {import('@aws-sdk/client-sqs').SQSClient} sqsClient * @param {import('../../libs/prompter.js').Prompter} prompter * @param {import('../../libs/logger.js').Logger} logger */ constructor(snsClient, sqsClient, prompter, logger) { this.snsClient = snsClient; this.sqsClient = sqsClient; this.prompter = prompter; this.logger = logger; } async welcome() { await this.logger.log(MESSAGES.description); } async confirmFifo() { await this.logger.log(MESSAGES.snsFifoDescription); this.isFifo = await this.prompter.confirm({ message: MESSAGES.snsFifoPrompt, }); if (this.isFifo) { this.logger.logSeparator(MESSAGES.headerDedup); await this.logger.log(MESSAGES.deduplicationNotice); await this.logger.log(MESSAGES.deduplicationDescription); this.autoDedup = await this.prompter.confirm({ message: MESSAGES.deduplicationPrompt, }); } } async createTopic() { await this.logger.log(MESSAGES.creatingTopics); this.topicName = await this.prompter.input({ message: MESSAGES.topicNamePrompt, }); if (this.isFifo) { this.topicName += ".fifo"; this.logger.logSeparator(MESSAGES.headerFifoNaming); await this.logger.log(MESSAGES.appendFifoNotice); } const response = await this.snsClient.send( new CreateTopicCommand({ Name: this.topicName, Attributes: { FifoTopic: this.isFifo ? "true" : "false", ...(this.autoDedup ? { ContentBasedDeduplication: "true" } : {}), }, }), ); this.topicArn = response.TopicArn; await this.logger.log( MESSAGES.topicCreatedNotice .replace("${TOPIC_NAME}", this.topicName) .replace("${TOPIC_ARN}", this.topicArn), ); } async createQueues() { await this.logger.log(MESSAGES.createQueuesNotice); // Increase this number to add more queues. const maxQueues = 2; for (let i = 0; i < maxQueues; i++) { await this.logger.log(MESSAGES.queueCount.replace("${COUNT}", i + 1)); let queueName = await this.prompter.input({ message: MESSAGES.queueNamePrompt.replace( "${EXAMPLE_NAME}", i === 0 ? "good-news" : "bad-news", ), }); if (this.isFifo) { queueName += ".fifo"; await this.logger.log(MESSAGES.appendFifoNotice); } const response = await this.sqsClient.send( new CreateQueueCommand({ QueueName: queueName, Attributes: { ...(this.isFifo ? { FifoQueue: "true" } : {}) }, }), ); const { Attributes } = await this.sqsClient.send( new GetQueueAttributesCommand({ QueueUrl: response.QueueUrl, AttributeNames: ["QueueArn"], }), ); this.queues.push({ queueName, queueArn: Attributes.QueueArn, queueUrl: response.QueueUrl, }); await this.logger.log( MESSAGES.queueCreatedNotice .replace("${QUEUE_NAME}", queueName) .replace("${QUEUE_URL}", response.QueueUrl) .replace("${QUEUE_ARN}", Attributes.QueueArn), ); } } async attachQueueIamPolicies() { for (const [index, queue] of this.queues.entries()) { const policy = JSON.stringify( { Statement: [ { Effect: "Allow", Principal: { Service: "sns.amazonaws.com", }, Action: "sqs:SendMessage", Resource: queue.queueArn, Condition: { ArnEquals: { "aws:SourceArn": this.topicArn, }, }, }, ], }, null, 2, ); if (index !== 0) { this.logger.logSeparator(); } await this.logger.log(MESSAGES.attachPolicyNotice); console.log(policy); const addPolicy = await this.prompter.confirm({ message: MESSAGES.addPolicyConfirmation.replace( "${QUEUE_NAME}", queue.queueName, ), }); if (addPolicy) { await this.sqsClient.send( new SetQueueAttributesCommand({ QueueUrl: queue.queueUrl, Attributes: { Policy: policy, }, }), ); queue.policy = policy; } else { await this.logger.log( MESSAGES.policyNotAttachedNotice.replace( "${QUEUE_NAME}", queue.queueName, ), ); } } } async subscribeQueuesToTopic() { for (const [index, queue] of this.queues.entries()) { /** * @type {import('@aws-sdk/client-sns').SubscribeCommandInput} */ const subscribeParams = { TopicArn: this.topicArn, Protocol: "sqs", Endpoint: queue.queueArn, }; let tones = []; if (this.isFifo) { if (index === 0) { await this.logger.log(MESSAGES.fifoFilterNotice); } tones = await this.prompter.checkbox({ message: MESSAGES.fifoFilterSelect.replace( "${QUEUE_NAME}", queue.queueName, ), choices: toneChoices, }); if (tones.length) { subscribeParams.Attributes = { FilterPolicyScope: "MessageAttributes", FilterPolicy: JSON.stringify({ tone: tones, }), }; } } const { SubscriptionArn } = await this.snsClient.send( new SubscribeCommand(subscribeParams), ); this.subscriptionArns.push(SubscriptionArn); await this.logger.log( MESSAGES.queueSubscribedNotice .replace("${QUEUE_NAME}", queue.queueName) .replace("${TOPIC_NAME}", this.topicName) .replace("${TONES}", tones.length ? tones.join(", ") : "none"), ); } } async publishMessages() { const message = await this.prompter.input({ message: MESSAGES.publishMessagePrompt, }); let groupId; let deduplicationId; let choices; if (this.isFifo) { await this.logger.log(MESSAGES.groupIdNotice); groupId = await this.prompter.input({ message: MESSAGES.groupIdPrompt, }); if (this.autoDedup === false) { await this.logger.log(MESSAGES.deduplicationIdNotice); deduplicationId = await this.prompter.input({ message: MESSAGES.deduplicationIdPrompt, }); } choices = await this.prompter.checkbox({ message: MESSAGES.messageAttributesPrompt, choices: toneChoices, }); } await this.snsClient.send( new PublishCommand({ TopicArn: this.topicArn, Message: message, ...(groupId ? { MessageGroupId: groupId, } : {}), ...(deduplicationId ? { MessageDeduplicationId: deduplicationId, } : {}), ...(choices ? { MessageAttributes: { tone: { DataType: "String.Array", StringValue: JSON.stringify(choices), }, }, } : {}), }), ); const publishAnother = await this.prompter.confirm({ message: MESSAGES.publishAnother, }); if (publishAnother) { await this.publishMessages(); } } async receiveAndDeleteMessages() { for (const queue of this.queues) { const { Messages } = await this.sqsClient.send( new ReceiveMessageCommand({ QueueUrl: queue.queueUrl, }), ); if (Messages) { await this.logger.log( MESSAGES.messagesReceivedNotice.replace( "${QUEUE_NAME}", queue.queueName, ), ); console.log(Messages); await this.sqsClient.send( new DeleteMessageBatchCommand({ QueueUrl: queue.queueUrl, Entries: Messages.map((message) => ({ Id: message.MessageId, ReceiptHandle: message.ReceiptHandle, })), }), ); } else { await this.logger.log( MESSAGES.noMessagesReceivedNotice.replace( "${QUEUE_NAME}", queue.queueName, ), ); } } const deleteAndPoll = await this.prompter.confirm({ message: MESSAGES.deleteAndPollConfirmation, }); if (deleteAndPoll) { await this.receiveAndDeleteMessages(); } } async destroyResources() { for (const subscriptionArn of this.subscriptionArns) { await this.snsClient.send( new UnsubscribeCommand({ SubscriptionArn: subscriptionArn }), ); } for (const queue of this.queues) { await this.sqsClient.send( new DeleteQueueCommand({ QueueUrl: queue.queueUrl }), ); } if (this.topicArn) { await this.snsClient.send( new DeleteTopicCommand({ TopicArn: this.topicArn }), ); } } async start() { console.clear(); try { this.logger.logSeparator(MESSAGES.headerWelcome); await this.welcome(); this.logger.logSeparator(MESSAGES.headerFifo); await this.confirmFifo(); this.logger.logSeparator(MESSAGES.headerCreateTopic); await this.createTopic(); this.logger.logSeparator(MESSAGES.headerCreateQueues); await this.createQueues(); this.logger.logSeparator(MESSAGES.headerAttachPolicy); await this.attachQueueIamPolicies(); this.logger.logSeparator(MESSAGES.headerSubscribeQueues); await this.subscribeQueuesToTopic(); this.logger.logSeparator(MESSAGES.headerPublishMessage); await this.publishMessages(); this.logger.logSeparator(MESSAGES.headerReceiveMessages); await this.receiveAndDeleteMessages(); } catch (err) { console.error(err); } finally { await this.destroyResources(); } } }
Kotlin
SDK for Kotlin
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

package com.example.sns import aws.sdk.kotlin.services.sns.SnsClient import aws.sdk.kotlin.services.sns.model.CreateTopicRequest import aws.sdk.kotlin.services.sns.model.DeleteTopicRequest import aws.sdk.kotlin.services.sns.model.PublishRequest import aws.sdk.kotlin.services.sns.model.SetSubscriptionAttributesRequest import aws.sdk.kotlin.services.sns.model.SubscribeRequest import aws.sdk.kotlin.services.sns.model.UnsubscribeRequest import aws.sdk.kotlin.services.sqs.SqsClient import aws.sdk.kotlin.services.sqs.model.CreateQueueRequest import aws.sdk.kotlin.services.sqs.model.DeleteMessageBatchRequest import aws.sdk.kotlin.services.sqs.model.DeleteMessageBatchRequestEntry import aws.sdk.kotlin.services.sqs.model.DeleteQueueRequest import aws.sdk.kotlin.services.sqs.model.GetQueueAttributesRequest import aws.sdk.kotlin.services.sqs.model.GetQueueUrlRequest import aws.sdk.kotlin.services.sqs.model.Message import aws.sdk.kotlin.services.sqs.model.QueueAttributeName import aws.sdk.kotlin.services.sqs.model.ReceiveMessageRequest import aws.sdk.kotlin.services.sqs.model.SetQueueAttributesRequest import com.google.gson.Gson import com.google.gson.JsonObject import com.google.gson.JsonPrimitive import java.util.Scanner /** Before running this Kotlin code example, set up your development environment, including your AWS credentials. For more information, see the following documentation topic: https://docs.aws.amazon.com/sdk-for-kotlin/latest/developer-guide/setup.html This Kotlin example performs the following tasks: 1. Gives the user three options to choose from. 2. Creates an Amazon Simple Notification Service (Amazon SNS) topic. 3. Creates an Amazon Simple Queue Service (Amazon SQS) queue. 4. Gets the SQS queue Amazon Resource Name (ARN) attribute. 5. Attaches an AWS Identity and Access Management (IAM) policy to the queue. 6. Subscribes to the SQS queue. 7. Publishes a message to the topic. 8. Displays the messages. 9. Deletes the received message. 10. Unsubscribes from the topic. 11. Deletes the SNS topic. */ val DASHES: String = String(CharArray(80)).replace("\u0000", "-") suspend fun main() { val input = Scanner(System.`in`) val useFIFO: String var duplication = "n" var topicName: String var deduplicationID: String? = null var groupId: String? = null val topicArn: String? var sqsQueueName: String val sqsQueueUrl: String? val sqsQueueArn: String val subscriptionArn: String? var selectFIFO = false val message: String val messageList: List<Message?>? val filterList = ArrayList<String>() var msgAttValue = "" println(DASHES) println("Welcome to the AWS SDK for Kotlin messaging with topics and queues.") println( """ In this workflow, you will create an SNS topic and subscribe an SQS queue to the topic. You can select from several options for configuring the topic and the subscriptions for the queue. You can then post to the topic and see the results in the queue. """.trimIndent(), ) println(DASHES) println(DASHES) println( """ SNS topics can be configured as FIFO (First-In-First-Out). FIFO topics deliver messages in order and support deduplication and message filtering. Would you like to work with FIFO topics? (y/n) """.trimIndent(), ) useFIFO = input.nextLine() if (useFIFO.compareTo("y") == 0) { selectFIFO = true println("You have selected FIFO") println( """ Because you have chosen a FIFO topic, deduplication is supported. Deduplication IDs are either set in the message or automatically generated from content using a hash function. If a message is successfully published to an SNS FIFO topic, any message published and determined to have the same deduplication ID, within the five-minute deduplication interval, is accepted but not delivered. For more information about deduplication, see https://docs.aws.amazon.com/sns/latest/dg/fifo-message-dedup.html.""", ) println("Would you like to use content-based deduplication instead of entering a deduplication ID? (y/n)") duplication = input.nextLine() if (duplication.compareTo("y") == 0) { println("Enter a group id value") groupId = input.nextLine() } else { println("Enter deduplication Id value") deduplicationID = input.nextLine() println("Enter a group id value") groupId = input.nextLine() } } println(DASHES) println(DASHES) println("2. Create a topic.") println("Enter a name for your SNS topic.") topicName = input.nextLine() if (selectFIFO) { println("Because you have selected a FIFO topic, '.fifo' must be appended to the topic name.") topicName = "$topicName.fifo" println("The name of the topic is $topicName") topicArn = createFIFO(topicName, duplication) println("The ARN of the FIFO topic is $topicArn") } else { println("The name of the topic is $topicName") topicArn = createSNSTopic(topicName) println("The ARN of the non-FIFO topic is $topicArn") } println(DASHES) println(DASHES) println("3. Create an SQS queue.") println("Enter a name for your SQS queue.") sqsQueueName = input.nextLine() if (selectFIFO) { sqsQueueName = "$sqsQueueName.fifo" } sqsQueueUrl = createQueue(sqsQueueName, selectFIFO) println("The queue URL is $sqsQueueUrl") println(DASHES) println(DASHES) println("4. Get the SQS queue ARN attribute.") sqsQueueArn = getSQSQueueAttrs(sqsQueueUrl) println("The ARN of the new queue is $sqsQueueArn") println(DASHES) println(DASHES) println("5. Attach an IAM policy to the queue.") // Define the policy to use. val policy = """{ "Statement": [ { "Effect": "Allow", "Principal": { "Service": "sns.amazonaws.com" }, "Action": "sqs:SendMessage", "Resource": "$sqsQueueArn", "Condition": { "ArnEquals": { "aws:SourceArn": "$topicArn" } } } ] }""" setQueueAttr(sqsQueueUrl, policy) println(DASHES) println(DASHES) println("6. Subscribe to the SQS queue.") if (selectFIFO) { println( """If you add a filter to this subscription, then only the filtered messages will be received in the queue. For information about message filtering, see https://docs.aws.amazon.com/sns/latest/dg/sns-message-filtering.html For this example, you can filter messages by a "tone" attribute.""", ) println("Would you like to filter messages for $sqsQueueName's subscription to the topic $topicName? (y/n)") val filterAns: String = input.nextLine() if (filterAns.compareTo("y") == 0) { var moreAns = false println("You can filter messages by using one or more of the following \"tone\" attributes.") println("1. cheerful") println("2. funny") println("3. serious") println("4. sincere") while (!moreAns) { println("Select a number or choose 0 to end.") val ans: String = input.nextLine() when (ans) { "1" -> filterList.add("cheerful") "2" -> filterList.add("funny") "3" -> filterList.add("serious") "4" -> filterList.add("sincere") else -> moreAns = true } } } } subscriptionArn = subQueue(topicArn, sqsQueueArn, filterList) println(DASHES) println(DASHES) println("7. Publish a message to the topic.") if (selectFIFO) { println("Would you like to add an attribute to this message? (y/n)") val msgAns: String = input.nextLine() if (msgAns.compareTo("y") == 0) { println("You can filter messages by one or more of the following \"tone\" attributes.") println("1. cheerful") println("2. funny") println("3. serious") println("4. sincere") println("Select a number or choose 0 to end.") val ans: String = input.nextLine() msgAttValue = when (ans) { "1" -> "cheerful" "2" -> "funny" "3" -> "serious" else -> "sincere" } println("Selected value is $msgAttValue") } println("Enter a message.") message = input.nextLine() pubMessageFIFO(message, topicArn, msgAttValue, duplication, groupId, deduplicationID) } else { println("Enter a message.") message = input.nextLine() pubMessage(message, topicArn) } println(DASHES) println(DASHES) println("8. Display the message. Press any key to continue.") input.nextLine() messageList = receiveMessages(sqsQueueUrl, msgAttValue) if (messageList != null) { for (mes in messageList) { println("Message Id: ${mes.messageId}") println("Full Message: ${mes.body}") } } println(DASHES) println(DASHES) println("9. Delete the received message. Press any key to continue.") input.nextLine() if (messageList != null) { deleteMessages(sqsQueueUrl, messageList) } println(DASHES) println(DASHES) println("10. Unsubscribe from the topic and delete the queue. Press any key to continue.") input.nextLine() unSub(subscriptionArn) deleteSQSQueue(sqsQueueName) println(DASHES) println(DASHES) println("11. Delete the topic. Press any key to continue.") input.nextLine() deleteSNSTopic(topicArn) println(DASHES) println(DASHES) println("The SNS/SQS workflow has completed successfully.") println(DASHES) } suspend fun deleteSNSTopic(topicArnVal: String?) { val request = DeleteTopicRequest { topicArn = topicArnVal } SnsClient { region = "us-east-1" }.use { snsClient -> snsClient.deleteTopic(request) println("$topicArnVal was deleted") } } suspend fun deleteSQSQueue(queueNameVal: String) { val getQueueRequest = GetQueueUrlRequest { queueName = queueNameVal } SqsClient { region = "us-east-1" }.use { sqsClient -> val queueUrlVal = sqsClient.getQueueUrl(getQueueRequest).queueUrl val deleteQueueRequest = DeleteQueueRequest { queueUrl = queueUrlVal } sqsClient.deleteQueue(deleteQueueRequest) println("$queueNameVal was successfully deleted.") } } suspend fun unSub(subscripArn: String?) { val request = UnsubscribeRequest { subscriptionArn = subscripArn } SnsClient { region = "us-east-1" }.use { snsClient -> snsClient.unsubscribe(request) println("Subscription was removed for $subscripArn") } } suspend fun deleteMessages(queueUrlVal: String?, messages: List<Message>) { val entriesVal: MutableList<DeleteMessageBatchRequestEntry> = mutableListOf() for (msg in messages) { val entry = DeleteMessageBatchRequestEntry { id = msg.messageId } entriesVal.add(entry) } val deleteMessageBatchRequest = DeleteMessageBatchRequest { queueUrl = queueUrlVal entries = entriesVal } SqsClient { region = "us-east-1" }.use { sqsClient -> sqsClient.deleteMessageBatch(deleteMessageBatchRequest) println("The batch delete of messages was successful") } } suspend fun receiveMessages(queueUrlVal: String?, msgAttValue: String): List<Message>? { if (msgAttValue.isEmpty()) { val request = ReceiveMessageRequest { queueUrl = queueUrlVal maxNumberOfMessages = 5 } SqsClient { region = "us-east-1" }.use { sqsClient -> return sqsClient.receiveMessage(request).messages } } else { val receiveRequest = ReceiveMessageRequest { queueUrl = queueUrlVal waitTimeSeconds = 1 maxNumberOfMessages = 5 } SqsClient { region = "us-east-1" }.use { sqsClient -> return sqsClient.receiveMessage(receiveRequest).messages } } } suspend fun pubMessage(messageVal: String?, topicArnVal: String?) { val request = PublishRequest { message = messageVal topicArn = topicArnVal } SnsClient { region = "us-east-1" }.use { snsClient -> val result = snsClient.publish(request) println("${result.messageId} message sent.") } } suspend fun pubMessageFIFO( messageVal: String?, topicArnVal: String?, msgAttValue: String, duplication: String, groupIdVal: String?, deduplicationID: String?, ) { // Means the user did not choose to use a message attribute. if (msgAttValue.isEmpty()) { if (duplication.compareTo("y") == 0) { val request = PublishRequest { message = messageVal messageGroupId = groupIdVal topicArn = topicArnVal } SnsClient { region = "us-east-1" }.use { snsClient -> val result = snsClient.publish(request) println(result.messageId.toString() + " Message sent.") } } else { val request = PublishRequest { message = messageVal messageDeduplicationId = deduplicationID messageGroupId = groupIdVal topicArn = topicArnVal } SnsClient { region = "us-east-1" }.use { snsClient -> val result = snsClient.publish(request) println(result.messageId.toString() + " Message sent.") } } } else { val messAttr = aws.sdk.kotlin.services.sns.model.MessageAttributeValue { dataType = "String" stringValue = "true" } val mapAtt: Map<String, aws.sdk.kotlin.services.sns.model.MessageAttributeValue> = mapOf(msgAttValue to messAttr) if (duplication.compareTo("y") == 0) { val request = PublishRequest { message = messageVal messageGroupId = groupIdVal topicArn = topicArnVal } SnsClient { region = "us-east-1" }.use { snsClient -> val result = snsClient.publish(request) println(result.messageId.toString() + " Message sent.") } } else { // Create a publish request with the message and attributes. val request = PublishRequest { topicArn = topicArnVal message = messageVal messageDeduplicationId = deduplicationID messageGroupId = groupIdVal messageAttributes = mapAtt } SnsClient { region = "us-east-1" }.use { snsClient -> val result = snsClient.publish(request) println(result.messageId.toString() + " Message sent.") } } } } // Subscribe to the SQS queue. suspend fun subQueue(topicArnVal: String?, queueArnVal: String, filterList: List<String?>): String? { val request: SubscribeRequest if (filterList.isEmpty()) { // No filter subscription is added. request = SubscribeRequest { protocol = "sqs" endpoint = queueArnVal returnSubscriptionArn = true topicArn = topicArnVal } SnsClient { region = "us-east-1" }.use { snsClient -> val result = snsClient.subscribe(request) println( "The queue " + queueArnVal + " has been subscribed to the topic " + topicArnVal + "\n" + "with the subscription ARN " + result.subscriptionArn, ) return result.subscriptionArn } } else { request = SubscribeRequest { protocol = "sqs" endpoint = queueArnVal returnSubscriptionArn = true topicArn = topicArnVal } SnsClient { region = "us-east-1" }.use { snsClient -> val result = snsClient.subscribe(request) println("The queue $queueArnVal has been subscribed to the topic $topicArnVal with the subscription ARN ${result.subscriptionArn}") val attributeNameVal = "FilterPolicy" val gson = Gson() val jsonString = "{\"tone\": []}" val jsonObject = gson.fromJson(jsonString, JsonObject::class.java) val toneArray = jsonObject.getAsJsonArray("tone") for (value: String? in filterList) { toneArray.add(JsonPrimitive(value)) } val updatedJsonString: String = gson.toJson(jsonObject) println(updatedJsonString) val attRequest = SetSubscriptionAttributesRequest { subscriptionArn = result.subscriptionArn attributeName = attributeNameVal attributeValue = updatedJsonString } snsClient.setSubscriptionAttributes(attRequest) return result.subscriptionArn } } } suspend fun setQueueAttr(queueUrlVal: String?, policy: String) { val attrMap: MutableMap<String, String> = HashMap() attrMap[QueueAttributeName.Policy.toString()] = policy val attributesRequest = SetQueueAttributesRequest { queueUrl = queueUrlVal attributes = attrMap } SqsClient { region = "us-east-1" }.use { sqsClient -> sqsClient.setQueueAttributes(attributesRequest) println("The policy has been successfully attached.") } } suspend fun getSQSQueueAttrs(queueUrlVal: String?): String { val atts: MutableList<QueueAttributeName> = ArrayList() atts.add(QueueAttributeName.QueueArn) val attributesRequest = GetQueueAttributesRequest { queueUrl = queueUrlVal attributeNames = atts } SqsClient { region = "us-east-1" }.use { sqsClient -> val response = sqsClient.getQueueAttributes(attributesRequest) val mapAtts = response.attributes if (mapAtts != null) { mapAtts.forEach { entry -> println("${entry.key} : ${entry.value}") return entry.value } } } return "" } suspend fun createQueue(queueNameVal: String?, selectFIFO: Boolean): String? { println("\nCreate Queue") if (selectFIFO) { val attrs = mutableMapOf<String, String>() attrs[QueueAttributeName.FifoQueue.toString()] = "true" val createQueueRequest = CreateQueueRequest { queueName = queueNameVal attributes = attrs } SqsClient { region = "us-east-1" }.use { sqsClient -> sqsClient.createQueue(createQueueRequest) println("\nGet queue url") val urlRequest = GetQueueUrlRequest { queueName = queueNameVal } val getQueueUrlResponse = sqsClient.getQueueUrl(urlRequest) return getQueueUrlResponse.queueUrl } } else { val createQueueRequest = CreateQueueRequest { queueName = queueNameVal } SqsClient { region = "us-east-1" }.use { sqsClient -> sqsClient.createQueue(createQueueRequest) println("Get queue url") val urlRequest = GetQueueUrlRequest { queueName = queueNameVal } val getQueueUrlResponse = sqsClient.getQueueUrl(urlRequest) return getQueueUrlResponse.queueUrl } } } suspend fun createSNSTopic(topicName: String?): String? { val request = CreateTopicRequest { name = topicName } SnsClient { region = "us-east-1" }.use { snsClient -> val result = snsClient.createTopic(request) return result.topicArn } } suspend fun createFIFO(topicName: String?, duplication: String): String? { val topicAttributes: MutableMap<String, String> = HashMap() if (duplication.compareTo("n") == 0) { topicAttributes["FifoTopic"] = "true" topicAttributes["ContentBasedDeduplication"] = "false" } else { topicAttributes["FifoTopic"] = "true" topicAttributes["ContentBasedDeduplication"] = "true" } val topicRequest = CreateTopicRequest { name = topicName attributes = topicAttributes } SnsClient { region = "us-east-1" }.use { snsClient -> val response = snsClient.createTopic(topicRequest) return response.topicArn } }