を使用した Amazon SQS の例 AWS SDK for .NET - AWS SDK for .NET

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

を使用した Amazon SQS の例 AWS SDK for .NET

次のコード例は、Amazon AWS SDK for .NET で を使用してアクションを実行し、一般的なシナリオを実装する方法を示していますSQS。

アクションはより大きなプログラムからのコードの抜粋であり、コンテキスト内で実行する必要があります。アクションは個々のサービス機能を呼び出す方法を示していますが、コンテキスト内のアクションは、関連するシナリオで確認できます。

「シナリオ」は、1 つのサービス内から、または他の AWS のサービスと組み合わせて複数の関数を呼び出し、特定のタスクを実行する方法を示すコード例です。

各例には、完全なソースコードへのリンクが含まれています。ここでは、コンテキストでコードを設定および実行する方法の手順を確認できます。

開始方法

次のコード例は、Amazon の使用を開始する方法を示していますSQS。

AWS SDK for .NET
注記

については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリでの設定と実行の方法を確認してください。

using Amazon.SQS; using Amazon.SQS.Model; namespace SQSActions; public static class HelloSQS { static async Task Main(string[] args) { var sqsClient = new AmazonSQSClient(); Console.WriteLine($"Hello Amazon SQS! Following are some of your queues:"); Console.WriteLine(); // You can use await and any of the async methods to get a response. // Let's get the first five queues. var response = await sqsClient.ListQueuesAsync( new ListQueuesRequest() { MaxResults = 5 }); foreach (var queue in response.QueueUrls) { Console.WriteLine($"\tQueue Url: {queue}"); Console.WriteLine(); } } }
  • API 詳細については、 リファレンスListQueuesの「」を参照してください。 AWS SDK for .NET API

アクション

次のコード例は、CreateQueue を使用する方法を示しています。

AWS SDK for .NET
注記

については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリでの設定と実行の方法を確認してください。

特定の名前を持つキューを作成します。

/// <summary> /// Create a queue with a specific name. /// </summary> /// <param name="queueName">The name for the queue.</param> /// <param name="useFifoQueue">True to use a FIFO queue.</param> /// <returns>The url for the queue.</returns> public async Task<string> CreateQueueWithName(string queueName, bool useFifoQueue) { int maxMessage = 256 * 1024; var queueAttributes = new Dictionary<string, string> { { QueueAttributeName.MaximumMessageSize, maxMessage.ToString() } }; var createQueueRequest = new CreateQueueRequest() { QueueName = queueName, Attributes = queueAttributes }; if (useFifoQueue) { // Update the name if it is not correct for a FIFO queue. if (!queueName.EndsWith(".fifo")) { createQueueRequest.QueueName = queueName + ".fifo"; } // Add an attribute for a FIFO queue. createQueueRequest.Attributes.Add( QueueAttributeName.FifoQueue, "true"); } var createResponse = await _amazonSQSClient.CreateQueueAsync( new CreateQueueRequest() { QueueName = queueName }); return createResponse.QueueUrl; }

Amazon SQSキューを作成し、メッセージを送信します。

using System; using System.Collections.Generic; using System.Threading.Tasks; using Amazon; using Amazon.SQS; using Amazon.SQS.Model; public class CreateSendExample { // Specify your AWS Region (an example Region is shown). private static readonly string QueueName = "Example_Queue"; private static readonly RegionEndpoint ServiceRegion = RegionEndpoint.USWest2; private static IAmazonSQS client; public static async Task Main() { client = new AmazonSQSClient(ServiceRegion); var createQueueResponse = await CreateQueue(client, QueueName); string queueUrl = createQueueResponse.QueueUrl; Dictionary<string, MessageAttributeValue> messageAttributes = new Dictionary<string, MessageAttributeValue> { { "Title", new MessageAttributeValue { DataType = "String", StringValue = "The Whistler" } }, { "Author", new MessageAttributeValue { DataType = "String", StringValue = "John Grisham" } }, { "WeeksOn", new MessageAttributeValue { DataType = "Number", StringValue = "6" } }, }; string messageBody = "Information about current NY Times fiction bestseller for week of 12/11/2016."; var sendMsgResponse = await SendMessage(client, queueUrl, messageBody, messageAttributes); } /// <summary> /// Creates a new Amazon SQS queue using the queue name passed to it /// in queueName. /// </summary> /// <param name="client">An SQS client object used to send the message.</param> /// <param name="queueName">A string representing the name of the queue /// to create.</param> /// <returns>A CreateQueueResponse that contains information about the /// newly created queue.</returns> public static async Task<CreateQueueResponse> CreateQueue(IAmazonSQS client, string queueName) { var request = new CreateQueueRequest { QueueName = queueName, Attributes = new Dictionary<string, string> { { "DelaySeconds", "60" }, { "MessageRetentionPeriod", "86400" }, }, }; var response = await client.CreateQueueAsync(request); Console.WriteLine($"Created a queue with URL : {response.QueueUrl}"); return response; } /// <summary> /// Sends a message to an SQS queue. /// </summary> /// <param name="client">An SQS client object used to send the message.</param> /// <param name="queueUrl">The URL of the queue to which to send the /// message.</param> /// <param name="messageBody">A string representing the body of the /// message to be sent to the queue.</param> /// <param name="messageAttributes">Attributes for the message to be /// sent to the queue.</param> /// <returns>A SendMessageResponse object that contains information /// about the message that was sent.</returns> public static async Task<SendMessageResponse> SendMessage( IAmazonSQS client, string queueUrl, string messageBody, Dictionary<string, MessageAttributeValue> messageAttributes) { var sendMessageRequest = new SendMessageRequest { DelaySeconds = 10, MessageAttributes = messageAttributes, MessageBody = messageBody, QueueUrl = queueUrl, }; var response = await client.SendMessageAsync(sendMessageRequest); Console.WriteLine($"Sent a message with id : {response.MessageId}"); return response; } }
  • API 詳細については、 リファレンスCreateQueueの「」を参照してください。 AWS SDK for .NET API

次のコード例は、DeleteMessage を使用する方法を示しています。

AWS SDK for .NET
注記

については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリでの設定と実行の方法を確認してください。

Amazon SQSキューからメッセージを受信し、メッセージを削除します。

public static async Task Main() { // If the AWS Region you want to use is different from // the AWS Region defined for the default user, supply // the specify your AWS Region to the client constructor. var client = new AmazonSQSClient(); string queueName = "Example_Queue"; var queueUrl = await GetQueueUrl(client, queueName); Console.WriteLine($"The SQS queue's URL is {queueUrl}"); var response = await ReceiveAndDeleteMessage(client, queueUrl); Console.WriteLine($"Message: {response.Messages[0]}"); } /// <summary> /// Retrieve the queue URL for the queue named in the queueName /// property using the client object. /// </summary> /// <param name="client">The Amazon SQS client used to retrieve the /// queue URL.</param> /// <param name="queueName">A string representing name of the queue /// for which to retrieve the URL.</param> /// <returns>The URL of the queue.</returns> public static async Task<string> GetQueueUrl(IAmazonSQS client, string queueName) { var request = new GetQueueUrlRequest { QueueName = queueName, }; GetQueueUrlResponse response = await client.GetQueueUrlAsync(request); return response.QueueUrl; } /// <summary> /// Retrieves the message from the quque at the URL passed in the /// queueURL parameters using the client. /// </summary> /// <param name="client">The SQS client used to retrieve a message.</param> /// <param name="queueUrl">The URL of the queue from which to retrieve /// a message.</param> /// <returns>The response from the call to ReceiveMessageAsync.</returns> public static async Task<ReceiveMessageResponse> ReceiveAndDeleteMessage(IAmazonSQS client, string queueUrl) { // Receive a single message from the queue. var receiveMessageRequest = new ReceiveMessageRequest { AttributeNames = { "SentTimestamp" }, MaxNumberOfMessages = 1, MessageAttributeNames = { "All" }, QueueUrl = queueUrl, VisibilityTimeout = 0, WaitTimeSeconds = 0, }; var receiveMessageResponse = await client.ReceiveMessageAsync(receiveMessageRequest); // Delete the received message from the queue. var deleteMessageRequest = new DeleteMessageRequest { QueueUrl = queueUrl, ReceiptHandle = receiveMessageResponse.Messages[0].ReceiptHandle, }; await client.DeleteMessageAsync(deleteMessageRequest); return receiveMessageResponse; } }
  • API 詳細については、 リファレンスDeleteMessageの「」を参照してください。 AWS SDK for .NET API

次の例は、DeleteMessageBatch を使用する方法を説明しています。

AWS SDK for .NET
注記

については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリでの設定と実行の方法を確認してください。

/// <summary> /// Delete a batch of messages from a queue by its url. /// </summary> /// <param name="queueUrl">The url of the queue.</param> /// <returns>True if successful.</returns> public async Task<bool> DeleteMessageBatchByUrl(string queueUrl, List<Message> messages) { var deleteRequest = new DeleteMessageBatchRequest() { QueueUrl = queueUrl, Entries = new List<DeleteMessageBatchRequestEntry>() }; foreach (var message in messages) { deleteRequest.Entries.Add(new DeleteMessageBatchRequestEntry() { ReceiptHandle = message.ReceiptHandle, Id = message.MessageId }); } var deleteResponse = await _amazonSQSClient.DeleteMessageBatchAsync(deleteRequest); return deleteResponse.Failed.Any(); }
  • API 詳細については、 リファレンスDeleteMessageBatchの「」を参照してください。 AWS SDK for .NET API

次の例は、DeleteQueue を使用する方法を説明しています。

AWS SDK for .NET
注記

の詳細については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリでの設定と実行の方法を確認してください。

を使用してキューを削除しますURL。

/// <summary> /// Delete a queue by its URL. /// </summary> /// <param name="queueUrl">The url of the queue.</param> /// <returns>True if successful.</returns> public async Task<bool> DeleteQueueByUrl(string queueUrl) { var deleteResponse = await _amazonSQSClient.DeleteQueueAsync( new DeleteQueueRequest() { QueueUrl = queueUrl }); return deleteResponse.HttpStatusCode == HttpStatusCode.OK; }
  • API 詳細については、 リファレンスDeleteQueueの「」を参照してください。 AWS SDK for .NET API

次のコード例は、GetQueueAttributes を使用する方法を示しています。

AWS SDK for .NET
注記

の詳細については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリでの設定と実行の方法を確認してください。

/// <summary> /// Get the ARN for a queue from its URL. /// </summary> /// <param name="queueUrl">The URL of the queue.</param> /// <returns>The ARN of the queue.</returns> public async Task<string> GetQueueArnByUrl(string queueUrl) { var getAttributesRequest = new GetQueueAttributesRequest() { QueueUrl = queueUrl, AttributeNames = new List<string>() { QueueAttributeName.QueueArn } }; var getAttributesResponse = await _amazonSQSClient.GetQueueAttributesAsync( getAttributesRequest); return getAttributesResponse.QueueARN; }
  • API 詳細については、 リファレンスGetQueueAttributesの「」を参照してください。 AWS SDK for .NET API

次の例は、GetQueueUrl を使用する方法を説明しています。

AWS SDK for .NET
注記

の詳細については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリでの設定と実行の方法を確認してください。

using System; using System.Threading.Tasks; using Amazon.SQS; using Amazon.SQS.Model; public class GetQueueUrl { /// <summary> /// Initializes the Amazon SQS client object and then calls the /// GetQueueUrlAsync method to retrieve the URL of an Amazon SQS /// queue. /// </summary> public static async Task Main() { // If the Amazon SQS message queue is not in the same AWS Region as your // default user, you need to provide the AWS Region as a parameter to the // client constructor. var client = new AmazonSQSClient(); string queueName = "New-Example-Queue"; try { var response = await client.GetQueueUrlAsync(queueName); if (response.HttpStatusCode == System.Net.HttpStatusCode.OK) { Console.WriteLine($"The URL for {queueName} is: {response.QueueUrl}"); } } catch (QueueDoesNotExistException ex) { Console.WriteLine(ex.Message); Console.WriteLine($"The queue {queueName} was not found."); } } }
  • API 詳細については、 リファレンスGetQueueUrlの「」を参照してください。 AWS SDK for .NET API

次のコード例は、ReceiveMessage を使用する方法を示しています。

AWS SDK for .NET
注記

の詳細については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリでの設定と実行の方法を確認してください。

を使用してキューからメッセージを受信しますURL。

/// <summary> /// Receive messages from a queue by its URL. /// </summary> /// <param name="queueUrl">The url of the queue.</param> /// <returns>The list of messages.</returns> public async Task<List<Message>> ReceiveMessagesByUrl(string queueUrl, int maxMessages) { // Setting WaitTimeSeconds to non-zero enables long polling. // For information about long polling, see // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html var messageResponse = await _amazonSQSClient.ReceiveMessageAsync( new ReceiveMessageRequest() { QueueUrl = queueUrl, MaxNumberOfMessages = maxMessages, WaitTimeSeconds = 1 }); return messageResponse.Messages; }

Amazon SQSキューからメッセージを受信し、メッセージを削除します。

public static async Task Main() { // If the AWS Region you want to use is different from // the AWS Region defined for the default user, supply // the specify your AWS Region to the client constructor. var client = new AmazonSQSClient(); string queueName = "Example_Queue"; var queueUrl = await GetQueueUrl(client, queueName); Console.WriteLine($"The SQS queue's URL is {queueUrl}"); var response = await ReceiveAndDeleteMessage(client, queueUrl); Console.WriteLine($"Message: {response.Messages[0]}"); } /// <summary> /// Retrieve the queue URL for the queue named in the queueName /// property using the client object. /// </summary> /// <param name="client">The Amazon SQS client used to retrieve the /// queue URL.</param> /// <param name="queueName">A string representing name of the queue /// for which to retrieve the URL.</param> /// <returns>The URL of the queue.</returns> public static async Task<string> GetQueueUrl(IAmazonSQS client, string queueName) { var request = new GetQueueUrlRequest { QueueName = queueName, }; GetQueueUrlResponse response = await client.GetQueueUrlAsync(request); return response.QueueUrl; } /// <summary> /// Retrieves the message from the quque at the URL passed in the /// queueURL parameters using the client. /// </summary> /// <param name="client">The SQS client used to retrieve a message.</param> /// <param name="queueUrl">The URL of the queue from which to retrieve /// a message.</param> /// <returns>The response from the call to ReceiveMessageAsync.</returns> public static async Task<ReceiveMessageResponse> ReceiveAndDeleteMessage(IAmazonSQS client, string queueUrl) { // Receive a single message from the queue. var receiveMessageRequest = new ReceiveMessageRequest { AttributeNames = { "SentTimestamp" }, MaxNumberOfMessages = 1, MessageAttributeNames = { "All" }, QueueUrl = queueUrl, VisibilityTimeout = 0, WaitTimeSeconds = 0, }; var receiveMessageResponse = await client.ReceiveMessageAsync(receiveMessageRequest); // Delete the received message from the queue. var deleteMessageRequest = new DeleteMessageRequest { QueueUrl = queueUrl, ReceiptHandle = receiveMessageResponse.Messages[0].ReceiptHandle, }; await client.DeleteMessageAsync(deleteMessageRequest); return receiveMessageResponse; } }
  • API 詳細については、 リファレンスReceiveMessageの「」を参照してください。 AWS SDK for .NET API

次の例は、SendMessage を使用する方法を説明しています。

AWS SDK for .NET
注記

の詳細については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリでの設定と実行の方法を確認してください。

Amazon SQSキューを作成し、メッセージを送信します。

using System; using System.Collections.Generic; using System.Threading.Tasks; using Amazon; using Amazon.SQS; using Amazon.SQS.Model; public class CreateSendExample { // Specify your AWS Region (an example Region is shown). private static readonly string QueueName = "Example_Queue"; private static readonly RegionEndpoint ServiceRegion = RegionEndpoint.USWest2; private static IAmazonSQS client; public static async Task Main() { client = new AmazonSQSClient(ServiceRegion); var createQueueResponse = await CreateQueue(client, QueueName); string queueUrl = createQueueResponse.QueueUrl; Dictionary<string, MessageAttributeValue> messageAttributes = new Dictionary<string, MessageAttributeValue> { { "Title", new MessageAttributeValue { DataType = "String", StringValue = "The Whistler" } }, { "Author", new MessageAttributeValue { DataType = "String", StringValue = "John Grisham" } }, { "WeeksOn", new MessageAttributeValue { DataType = "Number", StringValue = "6" } }, }; string messageBody = "Information about current NY Times fiction bestseller for week of 12/11/2016."; var sendMsgResponse = await SendMessage(client, queueUrl, messageBody, messageAttributes); } /// <summary> /// Creates a new Amazon SQS queue using the queue name passed to it /// in queueName. /// </summary> /// <param name="client">An SQS client object used to send the message.</param> /// <param name="queueName">A string representing the name of the queue /// to create.</param> /// <returns>A CreateQueueResponse that contains information about the /// newly created queue.</returns> public static async Task<CreateQueueResponse> CreateQueue(IAmazonSQS client, string queueName) { var request = new CreateQueueRequest { QueueName = queueName, Attributes = new Dictionary<string, string> { { "DelaySeconds", "60" }, { "MessageRetentionPeriod", "86400" }, }, }; var response = await client.CreateQueueAsync(request); Console.WriteLine($"Created a queue with URL : {response.QueueUrl}"); return response; } /// <summary> /// Sends a message to an SQS queue. /// </summary> /// <param name="client">An SQS client object used to send the message.</param> /// <param name="queueUrl">The URL of the queue to which to send the /// message.</param> /// <param name="messageBody">A string representing the body of the /// message to be sent to the queue.</param> /// <param name="messageAttributes">Attributes for the message to be /// sent to the queue.</param> /// <returns>A SendMessageResponse object that contains information /// about the message that was sent.</returns> public static async Task<SendMessageResponse> SendMessage( IAmazonSQS client, string queueUrl, string messageBody, Dictionary<string, MessageAttributeValue> messageAttributes) { var sendMessageRequest = new SendMessageRequest { DelaySeconds = 10, MessageAttributes = messageAttributes, MessageBody = messageBody, QueueUrl = queueUrl, }; var response = await client.SendMessageAsync(sendMessageRequest); Console.WriteLine($"Sent a message with id : {response.MessageId}"); return response; } }
  • API 詳細については、 リファレンスSendMessageの「」を参照してください。 AWS SDK for .NET API

次のコード例は、SetQueueAttributes を使用する方法を示しています。

AWS SDK for .NET
注記

の詳細については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリでの設定と実行の方法を確認してください。

トピックのキューのポリシー属性を設定します。

/// <summary> /// Set the policy attribute of a queue for a topic. /// </summary> /// <param name="queueArn">The ARN of the queue.</param> /// <param name="topicArn">The ARN of the topic.</param> /// <param name="queueUrl">The url for the queue.</param> /// <returns>True if successful.</returns> public async Task<bool> SetQueuePolicyForTopic(string queueArn, string topicArn, string queueUrl) { var queuePolicy = "{" + "\"Version\": \"2012-10-17\"," + "\"Statement\": [{" + "\"Effect\": \"Allow\"," + "\"Principal\": {" + $"\"Service\": " + "\"sns.amazonaws.com\"" + "}," + "\"Action\": \"sqs:SendMessage\"," + $"\"Resource\": \"{queueArn}\"," + "\"Condition\": {" + "\"ArnEquals\": {" + $"\"aws:SourceArn\": \"{topicArn}\"" + "}" + "}" + "}]" + "}"; var attributesResponse = await _amazonSQSClient.SetQueueAttributesAsync( new SetQueueAttributesRequest() { QueueUrl = queueUrl, Attributes = new Dictionary<string, string>() { { "Policy", queuePolicy } } }); return attributesResponse.HttpStatusCode == HttpStatusCode.OK; }
  • API 詳細については、 リファレンスSetQueueAttributesの「」を参照してください。 AWS SDK for .NET API

シナリオ

次のコードサンプルは、以下の操作方法を示しています。

  • トピック (FIFO または 以外) を作成しますFIFO。

  • フィルターを適用するオプションを使用して、複数のキューをトピックにサブスクライブします。

  • メッセージをトピックに発行します。

  • キューをポーリングして受信メッセージを確認します。

AWS SDK for .NET
注記

の詳細については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリでの設定と実行の方法を確認してください。

コマンドプロンプトからインタラクティブのシナリオを実行します。

/// <summary> /// Console application to run a workflow scenario for topics and queues. /// </summary> public static class TopicsAndQueues { private static bool _useFifoTopic = false; private static bool _useContentBasedDeduplication = false; private static string _topicName = null!; private static string _topicArn = null!; private static readonly int _queueCount = 2; private static readonly string[] _queueUrls = new string[_queueCount]; private static readonly string[] _subscriptionArns = new string[_queueCount]; private static readonly string[] _tones = { "cheerful", "funny", "serious", "sincere" }; public static SNSWrapper SnsWrapper { get; set; } = null!; public static SQSWrapper SqsWrapper { get; set; } = null!; public static bool UseConsole { get; set; } = true; static async Task Main(string[] args) { // Set up dependency injection for Amazon EventBridge. using var host = Host.CreateDefaultBuilder(args) .ConfigureLogging(logging => logging.AddFilter("System", LogLevel.Debug) .AddFilter<DebugLoggerProvider>("Microsoft", LogLevel.Information) .AddFilter<ConsoleLoggerProvider>("Microsoft", LogLevel.Trace)) .ConfigureServices((_, services) => services.AddAWSService<IAmazonSQS>() .AddAWSService<IAmazonSimpleNotificationService>() .AddTransient<SNSWrapper>() .AddTransient<SQSWrapper>() ) .Build(); ServicesSetup(host); PrintDescription(); await RunScenario(); } /// <summary> /// Populate the services for use within the console application. /// </summary> /// <param name="host">The services host.</param> private static void ServicesSetup(IHost host) { SnsWrapper = host.Services.GetRequiredService<SNSWrapper>(); SqsWrapper = host.Services.GetRequiredService<SQSWrapper>(); } /// <summary> /// Run the scenario for working with topics and queues. /// </summary> /// <returns>True if successful.</returns> public static async Task<bool> RunScenario() { try { await SetupTopic(); await SetupQueues(); await PublishMessages(); foreach (var queueUrl in _queueUrls) { var messages = await PollForMessages(queueUrl); if (messages.Any()) { await DeleteMessages(queueUrl, messages); } } await CleanupResources(); Console.WriteLine("Messaging with topics and queues workflow is complete."); return true; } catch (Exception ex) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"There was a problem running the scenario: {ex.Message}"); await CleanupResources(); Console.WriteLine(new string('-', 80)); return false; } } /// <summary> /// Print a description for the tasks in the workflow. /// </summary> /// <returns>Async task.</returns> private static void PrintDescription() { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Welcome to messaging with topics and queues."); Console.WriteLine(new string('-', 80)); Console.WriteLine($"In this workflow, you will create an SNS topic and subscribe {_queueCount} SQS queues to the topic." + $"\r\nYou can select from several options for configuring the topic and the subscriptions for the 2 queues." + $"\r\nYou can then post to the topic and see the results in the queues.\r\n"); Console.WriteLine(new string('-', 80)); } /// <summary> /// Set up the SNS topic to be used with the queues. /// </summary> /// <returns>Async task.</returns> private static async Task<string> SetupTopic() { Console.WriteLine(new string('-', 80)); Console.WriteLine($"SNS topics can be configured as FIFO (First-In-First-Out)." + $"\r\nFIFO topics deliver messages in order and support deduplication and message filtering." + $"\r\nYou can then post to the topic and see the results in the queues.\r\n"); _useFifoTopic = GetYesNoResponse("Would you like to work with FIFO topics?"); if (_useFifoTopic) { Console.WriteLine(new string('-', 80)); _topicName = GetUserResponse("Enter a name for your SNS topic: ", "example-topic"); Console.WriteLine( "Because you have selected a FIFO topic, '.fifo' must be appended to the topic name.\r\n"); Console.WriteLine(new string('-', 80)); Console.WriteLine($"Because you have chosen a FIFO topic, deduplication is supported." + $"\r\nDeduplication IDs are either set in the message or automatically generated " + $"\r\nfrom content using a hash function.\r\n" + $"\r\nIf a message is successfully published to an SNS FIFO topic, any message " + $"\r\npublished and determined to have the same deduplication ID, " + $"\r\nwithin the five-minute deduplication interval, is accepted but not delivered.\r\n" + $"\r\nFor more information about deduplication, " + $"\r\nsee https://docs.aws.amazon.com/sns/latest/dg/fifo-message-dedup.html."); _useContentBasedDeduplication = GetYesNoResponse("Use content-based deduplication instead of entering a deduplication ID?"); Console.WriteLine(new string('-', 80)); } _topicArn = await SnsWrapper.CreateTopicWithName(_topicName, _useFifoTopic, _useContentBasedDeduplication); Console.WriteLine($"Your new topic with the name {_topicName}" + $"\r\nand Amazon Resource Name (ARN) {_topicArn}" + $"\r\nhas been created.\r\n"); Console.WriteLine(new string('-', 80)); return _topicArn; } /// <summary> /// Set up the queues. /// </summary> /// <returns>Async task.</returns> private static async Task SetupQueues() { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Now you will create {_queueCount} Amazon Simple Queue Service (Amazon SQS) queues to subscribe to the topic."); // Repeat this section for each queue. for (int i = 0; i < _queueCount; i++) { var queueName = GetUserResponse("Enter a name for an Amazon SQS queue: ", $"example-queue-{i}"); if (_useFifoTopic) { // Only explain this once. if (i == 0) { Console.WriteLine( "Because you have selected a FIFO topic, '.fifo' must be appended to the queue name."); } var queueUrl = await SqsWrapper.CreateQueueWithName(queueName, _useFifoTopic); _queueUrls[i] = queueUrl; Console.WriteLine($"Your new queue with the name {queueName}" + $"\r\nand queue URL {queueUrl}" + $"\r\nhas been created.\r\n"); if (i == 0) { Console.WriteLine( $"The queue URL is used to retrieve the queue ARN,\r\n" + $"which is used to create a subscription."); Console.WriteLine(new string('-', 80)); } var queueArn = await SqsWrapper.GetQueueArnByUrl(queueUrl); if (i == 0) { Console.WriteLine( $"An AWS Identity and Access Management (IAM) policy must be attached to an SQS queue, enabling it to receive\r\n" + $"messages from an SNS topic"); } await SqsWrapper.SetQueuePolicyForTopic(queueArn, _topicArn, queueUrl); await SetupFilters(i, queueArn, queueName); } } Console.WriteLine(new string('-', 80)); } /// <summary> /// Set up filters with user options for a queue. /// </summary> /// <param name="queueCount">The number of this queue.</param> /// <param name="queueArn">The ARN of the queue.</param> /// <param name="queueName">The name of the queue.</param> /// <returns>Async Task.</returns> public static async Task SetupFilters(int queueCount, string queueArn, string queueName) { if (_useFifoTopic) { Console.WriteLine(new string('-', 80)); // Only explain this once. if (queueCount == 0) { Console.WriteLine( "Subscriptions to a FIFO topic can have filters." + "If you add a filter to this subscription, then only the filtered messages " + "will be received in the queue."); Console.WriteLine( "For information about message filtering, " + "see https://docs.aws.amazon.com/sns/latest/dg/sns-message-filtering.html"); Console.WriteLine( "For this example, you can filter messages by a" + "TONE attribute."); } var useFilter = GetYesNoResponse($"Filter messages for {queueName}'s subscription to the topic?"); string? filterPolicy = null; if (useFilter) { filterPolicy = CreateFilterPolicy(); } var subscriptionArn = await SnsWrapper.SubscribeTopicWithFilter(_topicArn, filterPolicy, queueArn); _subscriptionArns[queueCount] = subscriptionArn; Console.WriteLine( $"The queue {queueName} has been subscribed to the topic {_topicName} " + $"with the subscription ARN {subscriptionArn}"); Console.WriteLine(new string('-', 80)); } } /// <summary> /// Use user input to create a filter policy for a subscription. /// </summary> /// <returns>The serialized filter policy.</returns> public static string CreateFilterPolicy() { Console.WriteLine(new string('-', 80)); Console.WriteLine( $"You can filter messages by one or more of the following" + $"TONE attributes."); List<string> filterSelections = new List<string>(); var selectionNumber = 0; do { Console.WriteLine( $"Enter a number to add a TONE filter, or enter 0 to stop adding filters."); for (int i = 0; i < _tones.Length; i++) { Console.WriteLine($"\t{i + 1}. {_tones[i]}"); } var selection = GetUserResponse("", filterSelections.Any() ? "0" : "1"); int.TryParse(selection, out selectionNumber); if (selectionNumber > 0 && !filterSelections.Contains(_tones[selectionNumber - 1])) { filterSelections.Add(_tones[selectionNumber - 1]); } } while (selectionNumber != 0); var filters = new Dictionary<string, List<string>> { { "tone", filterSelections } }; string filterPolicy = JsonSerializer.Serialize(filters); return filterPolicy; } /// <summary> /// Publish messages using user settings. /// </summary> /// <returns>Async task.</returns> public static async Task PublishMessages() { Console.WriteLine("Now we can publish messages."); var keepSendingMessages = true; string? deduplicationId = null; string? toneAttribute = null; while (keepSendingMessages) { Console.WriteLine(); var message = GetUserResponse("Enter a message to publish.", "This is a sample message"); if (_useFifoTopic) { Console.WriteLine("Because you are using a FIFO topic, you must set a message group ID." + "\r\nAll messages within the same group will be received in the order " + "they were published."); Console.WriteLine(); var messageGroupId = GetUserResponse("Enter a message group ID for this message:", "1"); if (!_useContentBasedDeduplication) { Console.WriteLine("Because you are not using content-based deduplication, " + "you must enter a deduplication ID."); Console.WriteLine("Enter a deduplication ID for this message."); deduplicationId = GetUserResponse("Enter a deduplication ID for this message.", "1"); } if (GetYesNoResponse("Add an attribute to this message?")) { Console.WriteLine("Enter a number for an attribute."); for (int i = 0; i < _tones.Length; i++) { Console.WriteLine($"\t{i + 1}. {_tones[i]}"); } var selection = GetUserResponse("", "1"); int.TryParse(selection, out var selectionNumber); if (selectionNumber > 0 && selectionNumber < _tones.Length) { toneAttribute = _tones[selectionNumber - 1]; } } var messageID = await SnsWrapper.PublishToTopicWithAttribute( _topicArn, message, "tone", toneAttribute, deduplicationId, messageGroupId); Console.WriteLine($"Message published with id {messageID}."); } keepSendingMessages = GetYesNoResponse("Send another message?", false); } } /// <summary> /// Poll for the published messages to see the results of the user's choices. /// </summary> /// <returns>Async task.</returns> public static async Task<List<Message>> PollForMessages(string queueUrl) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Now the SQS queue at {queueUrl} will be polled to retrieve the messages." + "\r\nPress any key to continue."); if (UseConsole) { Console.ReadLine(); } var moreMessages = true; var messages = new List<Message>(); while (moreMessages) { var newMessages = await SqsWrapper.ReceiveMessagesByUrl(queueUrl, 10); moreMessages = newMessages.Any(); if (moreMessages) { messages.AddRange(newMessages); } } Console.WriteLine($"{messages.Count} message(s) were received by the queue at {queueUrl}."); foreach (var message in messages) { Console.WriteLine("\tMessage:" + $"\n\t{message.Body}"); } Console.WriteLine(new string('-', 80)); return messages; } /// <summary> /// Delete the message using handles in a batch. /// </summary> /// <returns>Async task.</returns> public static async Task DeleteMessages(string queueUrl, List<Message> messages) { Console.WriteLine(new string('-', 80)); Console.WriteLine("Now we can delete the messages in this queue in a batch."); await SqsWrapper.DeleteMessageBatchByUrl(queueUrl, messages); Console.WriteLine(new string('-', 80)); } /// <summary> /// Clean up the resources from the scenario. /// </summary> /// <returns>Async task.</returns> private static async Task CleanupResources() { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Clean up resources."); try { foreach (var queueUrl in _queueUrls) { if (!string.IsNullOrEmpty(queueUrl)) { var deleteQueue = GetYesNoResponse($"Delete queue with url {queueUrl}?"); if (deleteQueue) { await SqsWrapper.DeleteQueueByUrl(queueUrl); } } } foreach (var subscriptionArn in _subscriptionArns) { if (!string.IsNullOrEmpty(subscriptionArn)) { await SnsWrapper.UnsubscribeByArn(subscriptionArn); } } var deleteTopic = GetYesNoResponse($"Delete topic {_topicName}?"); if (deleteTopic) { await SnsWrapper.DeleteTopicByArn(_topicArn); } } catch (Exception ex) { Console.WriteLine($"Unable to clean up resources. Here's why: {ex.Message}."); } Console.WriteLine(new string('-', 80)); } /// <summary> /// Helper method to get a yes or no response from the user. /// </summary> /// <param name="question">The question string to print on the console.</param> /// <param name="defaultAnswer">Optional default answer to use.</param> /// <returns>True if the user responds with a yes.</returns> private static bool GetYesNoResponse(string question, bool defaultAnswer = true) { if (UseConsole) { Console.WriteLine(question); var ynResponse = Console.ReadLine(); var response = ynResponse != null && ynResponse.Equals("y", StringComparison.InvariantCultureIgnoreCase); return response; } // If not using the console, use the default. return defaultAnswer; } /// <summary> /// Helper method to get a string response from the user through the console. /// </summary> /// <param name="question">The question string to print on the console.</param> /// <param name="defaultAnswer">Optional default answer to use.</param> /// <returns>True if the user responds with a yes.</returns> private static string GetUserResponse(string question, string defaultAnswer) { if (UseConsole) { var response = ""; while (string.IsNullOrEmpty(response)) { Console.WriteLine(question); response = Console.ReadLine(); } return response; } // If not using the console, use the default. return defaultAnswer; } }

Amazon SQSオペレーションをラップするクラスを作成します。

/// <summary> /// Wrapper for Amazon Simple Queue Service (SQS) operations. /// </summary> public class SQSWrapper { private readonly IAmazonSQS _amazonSQSClient; /// <summary> /// Constructor for the Amazon SQS wrapper. /// </summary> /// <param name="amazonSQS">The injected Amazon SQS client.</param> public SQSWrapper(IAmazonSQS amazonSQS) { _amazonSQSClient = amazonSQS; } /// <summary> /// Create a queue with a specific name. /// </summary> /// <param name="queueName">The name for the queue.</param> /// <param name="useFifoQueue">True to use a FIFO queue.</param> /// <returns>The url for the queue.</returns> public async Task<string> CreateQueueWithName(string queueName, bool useFifoQueue) { int maxMessage = 256 * 1024; var queueAttributes = new Dictionary<string, string> { { QueueAttributeName.MaximumMessageSize, maxMessage.ToString() } }; var createQueueRequest = new CreateQueueRequest() { QueueName = queueName, Attributes = queueAttributes }; if (useFifoQueue) { // Update the name if it is not correct for a FIFO queue. if (!queueName.EndsWith(".fifo")) { createQueueRequest.QueueName = queueName + ".fifo"; } // Add an attribute for a FIFO queue. createQueueRequest.Attributes.Add( QueueAttributeName.FifoQueue, "true"); } var createResponse = await _amazonSQSClient.CreateQueueAsync( new CreateQueueRequest() { QueueName = queueName }); return createResponse.QueueUrl; } /// <summary> /// Get the ARN for a queue from its URL. /// </summary> /// <param name="queueUrl">The URL of the queue.</param> /// <returns>The ARN of the queue.</returns> public async Task<string> GetQueueArnByUrl(string queueUrl) { var getAttributesRequest = new GetQueueAttributesRequest() { QueueUrl = queueUrl, AttributeNames = new List<string>() { QueueAttributeName.QueueArn } }; var getAttributesResponse = await _amazonSQSClient.GetQueueAttributesAsync( getAttributesRequest); return getAttributesResponse.QueueARN; } /// <summary> /// Set the policy attribute of a queue for a topic. /// </summary> /// <param name="queueArn">The ARN of the queue.</param> /// <param name="topicArn">The ARN of the topic.</param> /// <param name="queueUrl">The url for the queue.</param> /// <returns>True if successful.</returns> public async Task<bool> SetQueuePolicyForTopic(string queueArn, string topicArn, string queueUrl) { var queuePolicy = "{" + "\"Version\": \"2012-10-17\"," + "\"Statement\": [{" + "\"Effect\": \"Allow\"," + "\"Principal\": {" + $"\"Service\": " + "\"sns.amazonaws.com\"" + "}," + "\"Action\": \"sqs:SendMessage\"," + $"\"Resource\": \"{queueArn}\"," + "\"Condition\": {" + "\"ArnEquals\": {" + $"\"aws:SourceArn\": \"{topicArn}\"" + "}" + "}" + "}]" + "}"; var attributesResponse = await _amazonSQSClient.SetQueueAttributesAsync( new SetQueueAttributesRequest() { QueueUrl = queueUrl, Attributes = new Dictionary<string, string>() { { "Policy", queuePolicy } } }); return attributesResponse.HttpStatusCode == HttpStatusCode.OK; } /// <summary> /// Receive messages from a queue by its URL. /// </summary> /// <param name="queueUrl">The url of the queue.</param> /// <returns>The list of messages.</returns> public async Task<List<Message>> ReceiveMessagesByUrl(string queueUrl, int maxMessages) { // Setting WaitTimeSeconds to non-zero enables long polling. // For information about long polling, see // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html var messageResponse = await _amazonSQSClient.ReceiveMessageAsync( new ReceiveMessageRequest() { QueueUrl = queueUrl, MaxNumberOfMessages = maxMessages, WaitTimeSeconds = 1 }); return messageResponse.Messages; } /// <summary> /// Delete a batch of messages from a queue by its url. /// </summary> /// <param name="queueUrl">The url of the queue.</param> /// <returns>True if successful.</returns> public async Task<bool> DeleteMessageBatchByUrl(string queueUrl, List<Message> messages) { var deleteRequest = new DeleteMessageBatchRequest() { QueueUrl = queueUrl, Entries = new List<DeleteMessageBatchRequestEntry>() }; foreach (var message in messages) { deleteRequest.Entries.Add(new DeleteMessageBatchRequestEntry() { ReceiptHandle = message.ReceiptHandle, Id = message.MessageId }); } var deleteResponse = await _amazonSQSClient.DeleteMessageBatchAsync(deleteRequest); return deleteResponse.Failed.Any(); } /// <summary> /// Delete a queue by its URL. /// </summary> /// <param name="queueUrl">The url of the queue.</param> /// <returns>True if successful.</returns> public async Task<bool> DeleteQueueByUrl(string queueUrl) { var deleteResponse = await _amazonSQSClient.DeleteQueueAsync( new DeleteQueueRequest() { QueueUrl = queueUrl }); return deleteResponse.HttpStatusCode == HttpStatusCode.OK; } }

Amazon SNSオペレーションをラップするクラスを作成します。

/// <summary> /// Wrapper for Amazon Simple Notification Service (SNS) operations. /// </summary> public class SNSWrapper { private readonly IAmazonSimpleNotificationService _amazonSNSClient; /// <summary> /// Constructor for the Amazon SNS wrapper. /// </summary> /// <param name="amazonSQS">The injected Amazon SNS client.</param> public SNSWrapper(IAmazonSimpleNotificationService amazonSNS) { _amazonSNSClient = amazonSNS; } /// <summary> /// Create a new topic with a name and specific FIFO and de-duplication attributes. /// </summary> /// <param name="topicName">The name for the topic.</param> /// <param name="useFifoTopic">True to use a FIFO topic.</param> /// <param name="useContentBasedDeduplication">True to use content-based de-duplication.</param> /// <returns>The ARN of the new topic.</returns> public async Task<string> CreateTopicWithName(string topicName, bool useFifoTopic, bool useContentBasedDeduplication) { var createTopicRequest = new CreateTopicRequest() { Name = topicName, }; if (useFifoTopic) { // Update the name if it is not correct for a FIFO topic. if (!topicName.EndsWith(".fifo")) { createTopicRequest.Name = topicName + ".fifo"; } // Add the attributes from the method parameters. createTopicRequest.Attributes = new Dictionary<string, string> { { "FifoTopic", "true" } }; if (useContentBasedDeduplication) { createTopicRequest.Attributes.Add("ContentBasedDeduplication", "true"); } } var createResponse = await _amazonSNSClient.CreateTopicAsync(createTopicRequest); return createResponse.TopicArn; } /// <summary> /// Subscribe a queue to a topic with optional filters. /// </summary> /// <param name="topicArn">The ARN of the topic.</param> /// <param name="useFifoTopic">The optional filtering policy for the subscription.</param> /// <param name="queueArn">The ARN of the queue.</param> /// <returns>The ARN of the new subscription.</returns> public async Task<string> SubscribeTopicWithFilter(string topicArn, string? filterPolicy, string queueArn) { var subscribeRequest = new SubscribeRequest() { TopicArn = topicArn, Protocol = "sqs", Endpoint = queueArn }; if (!string.IsNullOrEmpty(filterPolicy)) { subscribeRequest.Attributes = new Dictionary<string, string> { { "FilterPolicy", filterPolicy } }; } var subscribeResponse = await _amazonSNSClient.SubscribeAsync(subscribeRequest); return subscribeResponse.SubscriptionArn; } /// <summary> /// Publish a message to a topic with an attribute and optional deduplication and group IDs. /// </summary> /// <param name="topicArn">The ARN of the topic.</param> /// <param name="message">The message to publish.</param> /// <param name="attributeName">The optional attribute for the message.</param> /// <param name="attributeValue">The optional attribute value for the message.</param> /// <param name="deduplicationId">The optional deduplication ID for the message.</param> /// <param name="groupId">The optional group ID for the message.</param> /// <returns>The ID of the message published.</returns> public async Task<string> PublishToTopicWithAttribute( string topicArn, string message, string? attributeName = null, string? attributeValue = null, string? deduplicationId = null, string? groupId = null) { var publishRequest = new PublishRequest() { TopicArn = topicArn, Message = message, MessageDeduplicationId = deduplicationId, MessageGroupId = groupId }; if (attributeValue != null) { // Add the string attribute if it exists. publishRequest.MessageAttributes = new Dictionary<string, MessageAttributeValue> { { attributeName!, new MessageAttributeValue() { StringValue = attributeValue, DataType = "String"} } }; } var publishResponse = await _amazonSNSClient.PublishAsync(publishRequest); return publishResponse.MessageId; } /// <summary> /// Unsubscribe from a topic by a subscription ARN. /// </summary> /// <param name="subscriptionArn">The ARN of the subscription.</param> /// <returns>True if successful.</returns> public async Task<bool> UnsubscribeByArn(string subscriptionArn) { var unsubscribeResponse = await _amazonSNSClient.UnsubscribeAsync( new UnsubscribeRequest() { SubscriptionArn = subscriptionArn }); return unsubscribeResponse.HttpStatusCode == HttpStatusCode.OK; } /// <summary> /// Delete a topic by its topic ARN. /// </summary> /// <param name="topicArn">The ARN of the topic.</param> /// <returns>True if successful.</returns> public async Task<bool> DeleteTopicByArn(string topicArn) { var deleteResponse = await _amazonSNSClient.DeleteTopicAsync( new DeleteTopicRequest() { TopicArn = topicArn }); return deleteResponse.HttpStatusCode == HttpStatusCode.OK; } }

次のコード例は、 の AWS Message Processing Framework を使用して Amazon SQS メッセージを発行および受信するアプリケーションを作成する方法を示していますNET。

AWS SDK for .NET

の AWS メッセージ処理フレームワークのチュートリアルを提供しますNET。このチュートリアルでは、ユーザーが Amazon SQS メッセージを発行できるウェブアプリケーションと、メッセージを受信するコマンドラインアプリケーションを作成します。

完全なソースコードとセットアップと実行の手順については、 AWS SDK for .NET 「 デベロッパーガイド」のチュートリアル全体と「」の例を参照してくださいGitHub

この例で使用されているサービス
  • Amazon SQS

サーバーレスサンプル

次のコード例は、SQSキューからメッセージを受信することでトリガーされるイベントを受信する Lambda 関数を実装する方法を示しています。この関数はイベントパラメータからメッセージを取得し、各メッセージの内容を記録します。

AWS SDK for .NET
注記

の詳細については、「」を参照してください GitHub。サーバーレスサンプルリポジトリで完全な例を検索し、設定および実行の方法を確認してください。

を使用して Lambda でSQSイベントを消費するNET。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using Amazon.Lambda.Core; using Amazon.Lambda.SQSEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace SqsIntegrationSampleCode { public async Task FunctionHandler(SQSEvent evnt, ILambdaContext context) { foreach (var message in evnt.Records) { await ProcessMessageAsync(message, context); } context.Logger.LogInformation("done"); } private async Task ProcessMessageAsync(SQSEvent.SQSMessage message, ILambdaContext context) { try { context.Logger.LogInformation($"Processed message {message.Body}"); // TODO: Do interesting work based on the new message await Task.CompletedTask; } catch (Exception e) { //You can use Dead Letter Queue to handle failures. By configuring a Lambda DLQ. context.Logger.LogError($"An error occurred"); throw; } } }

次のコード例は、SQSキューからイベントを受信する Lambda 関数に部分的なバッチレスポンスを実装する方法を示しています。この関数は、レスポンスとしてバッチアイテムの失敗を報告し、対象のメッセージを後で再試行するよう Lambda に伝えます。

AWS SDK for .NET
注記

の詳細については、「」を参照してください GitHub。サーバーレスサンプルリポジトリで完全な例を検索し、設定および実行の方法を確認してください。

を使用して Lambda でSQSバッチ項目の失敗を報告するNET。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using Amazon.Lambda.Core; using Amazon.Lambda.SQSEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace sqsSample; public class Function { public async Task<SQSBatchResponse> FunctionHandler(SQSEvent evnt, ILambdaContext context) { List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new List<SQSBatchResponse.BatchItemFailure>(); foreach(var message in evnt.Records) { try { //process your message await ProcessMessageAsync(message, context); } catch (System.Exception) { //Add failed message identifier to the batchItemFailures list batchItemFailures.Add(new SQSBatchResponse.BatchItemFailure{ItemIdentifier=message.MessageId}); } } return new SQSBatchResponse(batchItemFailures); } private async Task ProcessMessageAsync(SQSEvent.SQSMessage message, ILambdaContext context) { if (String.IsNullOrEmpty(message.Body)) { throw new Exception("No Body in SQS Message."); } context.Logger.LogInformation($"Processed message {message.Body}"); // TODO: Do interesting work based on the new message await Task.CompletedTask; } }