使用 Amazon 啟用用戶端緩衝和請求批次處理 SQS - Amazon Simple Queue Service

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 Amazon 啟用用戶端緩衝和請求批次處理 SQS

AWS SDK for Java 包含存取 Amazon AmazonSQSBufferedAsyncClient的 SQS。此用戶端允許使用用戶端緩衝進行簡單的請求批次處理。從用戶端發出的呼叫會先緩衝,然後以批次請求的形式傳送至 Amazon SQS。

用戶端緩衝允許最多 10 個請求作為批次請求進行緩衝和傳送,從而降低使用 Amazon 的成本SQS並減少傳送請求的數量。 AmazonSQSBufferedAsyncClient緩衝同步和非同步呼叫。批次處理的請求以及對長輪詢的支援也有助於提高傳輸量。如需詳細資訊,請參閱 使用 Amazon 水平擴展和動作批次處理來增加輸送量 SQS

由於 AmazonSQSBufferedAsyncClient 實作的介面與 AmazonSQSAsyncClient 相同,從 AmazonSQSAsyncClient 移轉到 AmazonSQSBufferedAsyncClient 通常只需對既有的程式碼進行最少的更動。

注意

Amazon SQS 緩衝的非同步用戶端目前不支援FIFO佇列。

使用 AmazonSQSBufferedAsyncClient

開始之前,請完成 設定 Amazon SQS 中的步驟。

AWS SDK 適用於 Java 1.x

對於 AWS SDK適用於 Java 1.x 的 ,您可以AmazonSQSBufferedAsyncClient根據下列範例建立新的 :

// Create the basic Amazon SQS async client final AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient(); // Create the buffered client final AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync);

建立新的 之後AmazonSQSBufferedAsyncClient,您可以使用它將多個請求傳送至 Amazon SQS(如同使用 一樣AmazonSQSAsyncClient),例如:

final CreateQueueRequest createRequest = new CreateQueueRequest().withQueueName("MyQueue"); final CreateQueueResult res = bufferedSqs.createQueue(createRequest); final SendMessageRequest request = new SendMessageRequest(); final String body = "Your message text" + System.currentTimeMillis(); request.setMessageBody( body ); request.setQueueUrl(res.getQueueUrl()); final Future<SendMessageResult> sendResult = bufferedSqs.sendMessageAsync(request); final ReceiveMessageRequest receiveRq = new ReceiveMessageRequest() .withMaxNumberOfMessages(1) .withQueueUrl(queueUrl); final ReceiveMessageResult rx = bufferedSqs.receiveMessage(receiveRq);

設定 AmazonSQSBufferedAsyncClient

AmazonSQSBufferedAsyncClient 已預先設為適合大多數使用案例的組態。您可以進一步設定 AmazonSQSBufferedAsyncClient,例如:

  1. 使用必要的組態參數,建立 QueueBufferConfig 類別的執行個體。

  2. AmazonSQSBufferedAsyncClient 建構函式提供此執行個體。

// Create the basic Amazon SQS async client final AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient(); final QueueBufferConfig config = new QueueBufferConfig() .withMaxInflightReceiveBatches(5) .withMaxDoneReceiveBatches(15); // Create the buffered client final AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync, config);
QueueBufferConfig 組態參數
參數 預設值 描述
longPoll true

longPoll 設為 trueAmazonSQSBufferedAsyncClient 在消費訊息時會嘗試使用長輪詢。

longPollWaitTimeoutSeconds 20 秒

在傳回空的接收結果前,ReceiveMessage 呼叫留置於伺服器上等待訊息出現在佇列中的時間上限 (單位為秒)。

注意

停用長輪詢時,此設定沒有作用。

maxBatchOpenMs 200 毫秒

傳出呼叫對於其他也要批次處理同類型訊息的呼叫稍作等待的時間上限 (單位為毫秒)。

此設定值越高,執行相同工作量所需的批次數就越少 (但是,批次的第一次呼叫就需要花越長時間等待)。

若將此參數設為 0,提交的請求便不會等待其他請求,實際上即是停用了批次處理功能。

maxBatchSize 每批次 10 個請求

單次請求同時批次處理的訊息數上限。此設定值越高,執行相同請求數所需的批次數量就越少。

注意

每個批次 10 個請求是 Amazon 允許的最大值SQS。

maxBatchSizeBytes 256 KiB

用戶端嘗試傳送至 Amazon 的訊息批次大小上限,以位元組為單位SQS。

注意

256 KiB 是 Amazon 允許的最大值SQS。

maxDoneReceiveBatches 10 個批次

AmazonSQSBufferedAsyncClient 在用戶端預取並存放的接收批次數上限。

設定愈高,接收請求就愈滿意,而不必呼叫 Amazon SQS(不過,預先擷取的訊息愈多,在緩衝區中保留的時間愈長,導致自己的可見性逾時過期)。

注意

0 表示已停用所有訊息預先擷取,且訊息只會隨需使用。

maxInflightOutboundBatches 5 個批次

可同時處理的活動中傳出批次的上限。

設定越高,可以傳送的傳出批次越快 (受配額限制,例如 CPU或頻寬限制),而 耗用的執行緒越多AmazonSQSBufferedAsyncClient

maxInflightReceiveBatches 10 個批次

可同時處理的活動中接收批次的上限。

設定越高,可以接收的訊息越多 (受配額限制,例如 CPU或頻寬限制),而 耗用的執行緒越多AmazonSQSBufferedAsyncClient

注意

0 表示已停用所有訊息預先擷取,且訊息只會隨需使用。

visibilityTimeoutSeconds -1

若將此參數設為非零正值,其設定的可見性逾時值就會覆寫消費訊息的佇列所設的可見性逾時值。

注意

-1 表示將選擇佇列預設的設定。

可見性逾時不可設為 0

AWS SDK 適用於 Java 2.x

對於 AWS SDK適用於 Java 2.x 的 ,您可以SqsAsyncBatchManager根據下列範例建立新的 :

// Create the basic Sqs Async Client SqsAsyncClient sqs = SqsAsyncClient.builder() .region(Region.US_EAST_1) .build(); // Create the batch manager SqsAsyncBatchManager sqsAsyncBatchManager = sqs.batchManager();

建立新的 之後SqsAsyncBatchManager,您可以使用它將多個請求傳送至 Amazon SQS(如同使用 一樣SqsAsyncClient),例如:

final String queueName = "MyAsyncBufferedQueue" + UUID.randomUUID(); final CreateQueueRequest request = CreateQueueRequest.builder().queueName(queueName).build(); final String queueUrl = sqs.createQueue(request).join().queueUrl(); System.out.println("Queue created: " + queueUrl); // Send messages CompletableFuture<SendMessageResponse> sendMessageFuture; for (int i = 0; i < 10; i++) { final int index = i; sendMessageFuture = sqsAsyncBatchManager.sendMessage( r -> r.messageBody("Message " + index).queueUrl(queueUrl)); SendMessageResponse response= sendMessageFuture.join(); System.out.println("Message " + response.messageId() + " sent!"); } // Receive messages with customized configurations CompletableFuture<ReceiveMessageResponse> receiveResponseFuture = customizedBatchManager.receiveMessage( r -> r.queueUrl(queueUrl) .waitTimeSeconds(10) .visibilityTimeout(20) .maxNumberOfMessages(10) ); System.out.println("You have received " + receiveResponseFuture.join().messages().size() + " messages in total."); // Delete messages DeleteQueueRequest deleteQueueRequest = DeleteQueueRequest.builder().queueUrl(queueUrl).build(); int code = sqs.deleteQueue(deleteQueueRequest).join().sdkHttpResponse().statusCode(); System.out.println("Queue is deleted, with statusCode " + code);

設定 SqsAsyncBatchManager

SqsAsyncBatchManager 已預先設為適合大多數使用案例的組態。您可以進一步設定 SqsAsyncBatchManager,例如:

透過 建立自訂組態SqsAsyncBatchManager.Builder

SqsAsyncBatchManager customizedBatchManager = SqsAsyncBatchManager.builder() .client(sqs) .scheduledExecutor(Executors.newScheduledThreadPool(5)) .overrideConfiguration(b -> b .maxBatchSize(10) .sendRequestFrequency(Duration.ofMillis(200)) .receiveMessageMinWaitDuration(Duration.ofSeconds(10)) .receiveMessageVisibilityTimeout(Duration.ofSeconds(20)) .receiveMessageAttributeNames(Collections.singletonList("*")) .receiveMessageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.ALL))) .build();
BatchOverrideConfiguration 參數
參數 預設值 描述
maxBatchSize

每批次 10 個請求

單次請求同時批次處理的訊息數上限。此設定值越高,執行相同請求數所需的批次數量就越少。

注意

Amazon 允許的最大值SQS為每個批次 10 個請求。

sendRequestFrequency

200 毫秒

傳出呼叫對於其他也要批次處理同類型訊息的呼叫稍作等待的時間上限 (單位為毫秒)。

此設定值越高,執行相同工作量所需的批次數就越少 (但是,批次的第一次呼叫就需要花越長時間等待)。

若將此參數設為 0,提交的請求便不會等待其他請求,實際上即是停用了批次處理功能。

receiveMessageVisibilityTimeout

-1

若將此參數設為非零正值,其設定的可見性逾時值就會覆寫消費訊息的佇列所設的可見性逾時值。

注意

1 表示將選擇佇列預設的設定。可見性逾時不可設為 0

receiveMessageMinWaitDuration

50 毫秒

receiveMessage 呼叫等待擷取可用訊息的時間下限 (以毫秒為單位)。設定越高,執行相同數量的請求所需的批次就越少。