使用适用于 Amazon SQS 的自动请求批处理 AWS SDK for Java 2.x - AWS SDK for Java 2.x

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用适用于 Amazon SQS 的自动请求批处理 AWS SDK for Java 2.x

Amazon API 的自动请求批处理SQS是一个高级库,它为批处理和缓冲SQS操作请求提供了一种有效的方法。通过使用批处理API,可以将请求数量减少到SQS,从而提高吞吐量并最大限度地降低成本。

由于批处理API方法与方法receiveMessagesendMessage、、changeMessageVisibility、)相匹配deleteMessage,因此您可以将该批次API用作即用替代SqsAsyncClient方法,只需进行最少的更改。

本主题概述了如何配置和使用API适用于 Amazon SQS 的自动请求批处理。

检查先决条件

您需要使用SDK适用于 Java 2.x 的 2.28.0 或更高版本才能访问批处理。API你的 Maven 至少pom.xml应该包含以下元素。

<dependencyManagement> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>bom</artifactId> <version>2.28.231</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>sqs</artifactId> </dependency> </dependencies>

1 最新版本

创建批处理管理器

自动批处理API请求由SqsAsyncBatchManager接口实现。您可以通过几种方式创建管理器实例。

1. 使用默认配置 SqsAsyncClient

创建批处理管理器的最简单方法是在现有SqsAsyncClient实例上调用batchManager工厂方法。以下片段显示了简单的方法。

SqsAsyncClient asyncClient = SqsAsyncClient.create(); SqsAsyncBatchManager sqsAsyncBatchManager = asyncClient.batchManager();

当您使用这种方法时,SqsAsyncBatchManager实例将使用该覆盖的配置设置 SqsAsyncBatchManager部分表格中显示的默认值。此外,该SqsAsyncBatchManager实例使用从中创建它的SqsAsyncClient实例的。ExecutorService

1. 使用自定义配置 SqsAsyncBatchManager.Builder

对于更高级的用例,您可以使用自定义批处理管理器SqsAsyncBatchManager.Builder。通过使用这种方法创建SqsAsyncBatchManager实例,您可以微调批处理行为。以下代码段显示了如何使用构建器自定义批处理行为的示例。

SqsAsyncBatchManager batchManager = SqsAsyncBatchManager.builder() .client(SqsAsyncClient.create()) .scheduledExecutor(Executors.newScheduledThreadPool(5)) .overrideConfiguration(b -> b .receiveMessageMinWaitDuration(Duration.ofSeconds(10)) .receiveMessageVisibilityTimeout(Duration.ofSeconds(1)) .receiveMessageAttributeNames(Collections.singletonList("*")) .receiveMessageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.ALL))) .build();

使用这种方法时,您可以调整该覆盖的配置设置 SqsAsyncBatchManager部分表格中显示的BatchOverrideConfiguration对象的设置。您也可以使用这种方法ScheduledExecutorService为批处理管理器提供自定义。

发送消息

要使用批处理管理器发送消息,请使用SqsAsyncBatchManager#sendMessage方法。缓SDK冲区请求并在达到maxBatchSizesendRequestFrequency值时将其作为批量发送。

以下示例显示了sendMessage紧随其后的是另一个请求的请求。在本例中,SDK将以单个批次发送这两条消息。

// Sending the first message CompletableFuture<SendMessageResponse> futureOne = sqsAsyncBatchManager.sendMessage(r -> r.messageBody("One").queueUrl("queue")); // Sending the second message CompletableFuture<SendMessageResponse> futureTwo = sqsAsyncBatchManager.sendMessage(r -> r.messageBody("Two").queueUrl("queue")); // Waiting for both futures to complete and retrieving the responses SendMessageResponse messageOne = futureOne.join(); SendMessageResponse messageTwo = futureTwo.join();

更改消息可见性超时

您可以使用SqsAsyncBatchManager#changeMessageVisibility方法批量更改消息的可见性超时。缓SDK冲区请求并在达到maxBatchSizesendRequestFrequency值时将其作为批量发送。

以下示例说明如何调用该changeMessageVisibility方法。

CompletableFuture<ChangeMessageVisibilityResponse> futureOne = sqsAsyncBatchManager.changeMessageVisibility(r -> r.receiptHandle("receiptHandle") .queueUrl("queue")); ChangeMessageVisibilityResponse response = futureOne.join();

删除消息

您可以使用SqsAsyncBatchManager#deleteMessage方法批量删除消息。缓SDK冲区请求并在达到maxBatchSizesendRequestFrequency值时将其作为批量发送。

以下示例显示了如何调用该deleteMessage方法。

CompletableFuture<DeleteMessageResponse> futureOne = sqsAsyncBatchManager.deleteMessage(r -> r.receiptHandle("receiptHandle") .queueUrl("queue")); DeleteMessageResponse response = futureOne.join();

接收消息

使用默认设置

当您在应用程序中轮询该SqsAsyncBatchManager#receiveMessage方法时,批处理管理器会从其内部缓冲区获取消息,然后在后台SDK自动更新这些消息。

以下示例说明如何调用该receiveMessage方法。

CompletableFuture<ReceiveMessageResponse> responseFuture = sqsAsyncBatchManager.receiveMessage(r -> r.queueUrl("queueUrl"));

使用自定义设置

如果您想进一步自定义请求,例如通过设置自定义等待时间和指定要检索的消息数量,则可以自定义请求,如以下示例所示。

CompletableFuture<ReceiveMessageResponse> response = sqsAsyncBatchManager.receiveMessage(r -> r.queueUrl("queueUrl") .waitTimeSeconds(5) .visibilityTimeout(20));
注意

如果您receiveMessage使用ReceiveMessageRequest包含以下任何参数的调用,则会SDK绕过批处理管理器并发送常规异步receiveMessage请求:

  • messageAttributeNames

  • messageSystemAttributeNames

  • messageSystemAttributeNamesWithStrings

  • overrideConfiguration

覆盖的配置设置 SqsAsyncBatchManager

创建SqsAsyncBatchManager实例时,您可以调整以下设置。上提供了以下设置列表BatchOverrideConfiguration.Builder

设置 描述 默认值
maxBatchSize SendMessageBatchRequestChangeMessageVisibilityBatchRequest或每批次的最大请求数DeleteMessageBatchRequest。最大值为 10。 10
sendRequestFrequency

发送批次之前的时间,maxBatchSize除非提前到达。较高的值可能会减少请求,但会增加延迟。

200 毫秒
receiveMessageVisibilityTimeout 消息的可见性超时。如果未设置,则使用队列的默认值。 队列的默认值
receiveMessageMinWaitDuration receiveMessage请求的最短等待时间。避免设置为 0 以防止CPU浪费。 50 毫秒
receiveMessageSystemAttributeNames 请求receiveMessage呼叫的系统属性名称列表。
receiveMessageAttributeNames 请求receiveMessage呼叫的属性名称列表。