Consume messages with the AWS Message Processing Framework for .NET
Note
This is prerelease documentation for a feature in preview release. It is subject to change.
The AWS Message Processing Framework for .NET allows you to consume messages that have been published by using the framework or one of the messaging services. The messages can be consumed in a variety of ways, some of which are described below.
Message Handlers
To consume messages, implement a message handler using the IMessageHandler
interface
for each message type you wish to process. The mapping between message types and message handlers is
configured in the project startup.
await Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
// Register the AWS Message Processing Framework for .NET
services.AddAWSMessageBus(builder =>
{
// Register an SQS Queue that the framework will poll for messages.
// NOTE: The URL given below is an example. Use the appropriate URL for your SQS Queue.
builder.AddSQSPoller("https://sqs.us-west-2.amazonaws.com/012345678910/MyAppProd");
// Register all IMessageHandler implementations with the message type they should process.
// Here messages that match our ChatMessage .NET type will be handled by our ChatMessageHandler
builder.AddMessageHandler<ChatMessageHandler, ChatMessage>();
});
})
.Build()
.RunAsync();
The following code shows a sample message handler for a ChatMessage
message.
public class ChatMessageHandler : IMessageHandler<ChatMessage>
{
public Task<MessageProcessStatus> HandleAsync(MessageEnvelope<ChatMessage> messageEnvelope, CancellationToken token = default)
{
// Add business and validation logic here.
if (messageEnvelope == null)
{
return Task.FromResult(MessageProcessStatus.Failed());
}
if (messageEnvelope.Message == null)
{
return Task.FromResult(MessageProcessStatus.Failed());
}
ChatMessage message = messageEnvelope.Message;
Console.WriteLine($"Message Description: {message.MessageDescription}");
// Return success so the framework will delete the message from the queue.
return Task.FromResult(MessageProcessStatus.Success());
}
}
The outer MessageEnvelope
contains metadata used by the framework. Its
message
property is the message type (in this case ChatMessage
).
You can return MessageProcessStatus.Success()
to indicate that the message was
processed successfully and the framework will delete the message from the Amazon SQS queue. When returning
MessageProcessStatus.Failed()
, the message will remain in the queue where it can be
processed again or moved to a dead-letter queue, if configured.
Handling Messages in a Long-Running Process
You can call AddSQSPoller
with an SQS queue URL to start a long-running BackgroundService
await Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
// Register the AWS Message Processing Framework for .NET
services.AddAWSMessageBus(builder =>
{
// Register an SQS Queue that the framework will poll for messages.
// NOTE: The URL given below is an example. Use the appropriate URL for your SQS Queue.
builder.AddSQSPoller("https://sqs.us-west-2.amazonaws.com/012345678910/MyAppProd", options =>
{
// The maximum number of messages from this queue that the framework will process concurrently on this client.
options.MaxNumberOfConcurrentMessages = 10;
// The duration each call to SQS will wait for new messages.
options.WaitTimeSeconds = 20;
});
// Register all IMessageHandler implementations with the message type they should process.
builder.AddMessageHandler<ChatMessageHandler, ChatMessage>();
});
})
.Build()
.RunAsync();
Configuring the SQS Message Poller
The SQS message poller can be configured by the SQSMessagePollerOptions
when calling
AddSQSPoller
.
-
MaxNumberOfConcurrentMessages
- The maximum number of messages from the queue to process concurrently. The default value is 10. -
WaitTimeSeconds
- The duration (in seconds) for which theReceiveMessage
SQS call waits for a message to arrive in the queue before returning. If a message is available, the call returns sooner thanWaitTimeSeconds
. The default value is 20.
Message Visibility Timeout Handling
SQS messages have a visibility timeout period. When one consumer begins handling a given message, it remains in the queue but is hidden from other consumers to avoid processing it more than once. If the message is not handled and deleted before becoming visible again, another consumer might attempt to handle the same message.
The framework will track and attempt to extend the visibility timeout for messages that it is
currently handling. You can configure this behavior on the SQSMessagePollerOptions
when
calling AddSQSPoller
.
-
VisibilityTimeout
- The duration in seconds that received messages are hidden from subsequent retrieve requests. The default value is 30. -
VisibilityTimeoutExtensionThreshold
- When a message's visibility timeout is within this many seconds of expiring, the framework will extend the visibility timeout (by anotherVisibilityTimeout
seconds). The default value is 5. -
VisibilityTimeoutExtensionHeartbeatInterval
- How often in seconds that the framework will check for messages that are withinVisibilityTimeoutExtensionThreshold
seconds of expiring, and then extend their visibility timeout. The default value is 1.
In the following example, the framework will check every 1 second for messages that are still being handled. For those messages within 5 seconds of becoming visible again, the framework will automatically extend the visibility timeout of each message by another 30 seconds.
// NOTE: The URL given below is an example. Use the appropriate URL for your SQS Queue.
builder.AddSQSPoller("https://sqs.us-west-2.amazonaws.com/012345678910/MyAppProd", options =>
{
options.VisibilityTimeout = 30;
options.VisibilityTimeoutExtensionThreshold = 5;
VisibilityTimeoutExtensionHeartbeatInterval = 1;
});
Handling messages in AWS Lambda functions
You can use the AWS Message Processing Framework for .NET with SQS's integration with Lambda. This is
provided by the AWS.Messaging.Lambda
package. Refer to its README