本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用适用于 Amazon SQS 的自动请求批处理 AWS SDK for Java 2.x
Amazon API 的自动请求批处理SQS是一个高级库,它为批处理和缓冲SQS操作请求提供了一种有效的方法。通过使用批处理API,可以将请求数量减少到SQS,从而提高吞吐量并最大限度地降低成本。
由于批处理API方法与方法receiveMessage
(sendMessage
、、changeMessageVisibility
、)相匹配deleteMessage
,因此您可以将该批次API用作即用替代SqsAsyncClient
方法,只需进行最少的更改。
本主题概述了如何配置和使用API适用于 Amazon SQS 的自动请求批处理。
检查先决条件
您需要使用SDK适用于 Java 2.x 的 2.28.0 或更高版本才能访问批处理。API你的 Maven 至少pom.xml
应该包含以下元素。
<dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.28.231
</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
</dependency>
</dependencies>
1 最新版本
创建批处理管理器
自动批处理API请求由SqsAsyncBatchManager
1. 使用默认配置 SqsAsyncClient
创建批处理管理器的最简单方法是在现有SqsAsyncClientbatchManager
工厂方法。以下片段显示了简单的方法。
SqsAsyncClient asyncClient = SqsAsyncClient.create(); SqsAsyncBatchManager sqsAsyncBatchManager = asyncClient.batchManager();
当您使用这种方法时,SqsAsyncBatchManager
实例将使用该覆盖的配置设置 SqsAsyncBatchManager部分表格中显示的默认值。此外,该SqsAsyncBatchManager
实例使用从中创建它的SqsAsyncClient
实例的。ExecutorService
1. 使用自定义配置 SqsAsyncBatchManager.Builder
对于更高级的用例,您可以使用自定义批处理管理器SqsAsyncBatchManager.Builder
SqsAsyncBatchManager
实例,您可以微调批处理行为。以下代码段显示了如何使用构建器自定义批处理行为的示例。
SqsAsyncBatchManager batchManager = SqsAsyncBatchManager.builder() .client(SqsAsyncClient.create()) .scheduledExecutor(Executors.newScheduledThreadPool(5)) .overrideConfiguration(b -> b .receiveMessageMinWaitDuration(Duration.ofSeconds(10)) .receiveMessageVisibilityTimeout(Duration.ofSeconds(1)) .receiveMessageAttributeNames(Collections.singletonList("*")) .receiveMessageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.ALL))) .build();
使用这种方法时,您可以调整该覆盖的配置设置 SqsAsyncBatchManager部分表格中显示的BatchOverrideConfiguration
对象的设置。您也可以使用这种方法ScheduledExecutorService
发送消息
要使用批处理管理器发送消息,请使用SqsAsyncBatchManager#sendMessage
方法。缓SDK冲区请求并在达到maxBatchSize
或sendRequestFrequency
值时将其作为批量发送。
以下示例显示了sendMessage
紧随其后的是另一个请求的请求。在本例中,SDK将以单个批次发送这两条消息。
// Sending the first message CompletableFuture<SendMessageResponse> futureOne = sqsAsyncBatchManager.sendMessage(r -> r.messageBody("One").queueUrl("queue")); // Sending the second message CompletableFuture<SendMessageResponse> futureTwo = sqsAsyncBatchManager.sendMessage(r -> r.messageBody("Two").queueUrl("queue")); // Waiting for both futures to complete and retrieving the responses SendMessageResponse messageOne = futureOne.join(); SendMessageResponse messageTwo = futureTwo.join();
更改消息可见性超时
您可以使用SqsAsyncBatchManager#changeMessageVisibility
maxBatchSize
或sendRequestFrequency
值时将其作为批量发送。
以下示例说明如何调用该changeMessageVisibility
方法。
CompletableFuture<ChangeMessageVisibilityResponse> futureOne = sqsAsyncBatchManager.changeMessageVisibility(r -> r.receiptHandle("receiptHandle") .queueUrl("queue")); ChangeMessageVisibilityResponse response = futureOne.join();
删除消息
您可以使用SqsAsyncBatchManager#deleteMessage
maxBatchSize
或sendRequestFrequency
值时将其作为批量发送。
以下示例显示了如何调用该deleteMessage
方法。
CompletableFuture<DeleteMessageResponse> futureOne = sqsAsyncBatchManager.deleteMessage(r -> r.receiptHandle("receiptHandle") .queueUrl("queue")); DeleteMessageResponse response = futureOne.join();
接收消息
使用默认设置
当您在应用程序中轮询该SqsAsyncBatchManager#receiveMessage
方法时,批处理管理器会从其内部缓冲区获取消息,然后在后台SDK自动更新这些消息。
以下示例说明如何调用该receiveMessage
方法。
CompletableFuture<ReceiveMessageResponse> responseFuture = sqsAsyncBatchManager.receiveMessage(r -> r.queueUrl("queueUrl"));
使用自定义设置
如果您想进一步自定义请求,例如通过设置自定义等待时间和指定要检索的消息数量,则可以自定义请求,如以下示例所示。
CompletableFuture<ReceiveMessageResponse> response = sqsAsyncBatchManager.receiveMessage(r -> r.queueUrl("queueUrl") .waitTimeSeconds(5) .visibilityTimeout(20));
注意
如果您receiveMessage
使用ReceiveMessageRequest
包含以下任何参数的调用,则会SDK绕过批处理管理器并发送常规异步receiveMessage
请求:
-
messageAttributeNames
-
messageSystemAttributeNames
-
messageSystemAttributeNamesWithStrings
-
overrideConfiguration
覆盖的配置设置 SqsAsyncBatchManager
创建SqsAsyncBatchManager
实例时,您可以调整以下设置。上提供了以下设置列表BatchOverrideConfiguration.Builder
设置 | 描述 | 默认值 |
---|---|---|
maxBatchSize |
SendMessageBatchRequest 、ChangeMessageVisibilityBatchRequest 或每批次的最大请求数DeleteMessageBatchRequest 。最大值为 10。 |
10 |
sendRequestFrequency |
发送批次之前的时间, |
200 毫秒 |
receiveMessageVisibilityTimeout |
消息的可见性超时。如果未设置,则使用队列的默认值。 | 队列的默认值 |
receiveMessageMinWaitDuration |
receiveMessage 请求的最短等待时间。避免设置为 0 以防止CPU浪费。 |
50 毫秒 |
receiveMessageSystemAttributeNames |
请求receiveMessage 呼叫的系统属性名称 |
无 |
receiveMessageAttributeNames |
请求receiveMessage 呼叫的属性名称列表。 |
无 |