本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
使用 Amazon 啟用用戶端緩衝和請求批次處理 SQS
AWS SDK for JavaAmazonSQSBufferedAsyncClient
的 SQS。此用戶端允許使用用戶端緩衝進行簡單的請求批次處理。從用戶端發出的呼叫會先緩衝,然後以批次請求的形式傳送至 Amazon SQS。
用戶端緩衝允許最多 10 個請求作為批次請求進行緩衝和傳送,從而降低使用 Amazon 的成本SQS並減少傳送請求的數量。 AmazonSQSBufferedAsyncClient
緩衝同步和非同步呼叫。批次處理的請求以及對長輪詢的支援也有助於提高傳輸量。如需詳細資訊,請參閱 使用 Amazon 水平擴展和動作批次處理來增加輸送量 SQS。
由於 AmazonSQSBufferedAsyncClient
實作的介面與 AmazonSQSAsyncClient
相同,從 AmazonSQSAsyncClient
移轉到 AmazonSQSBufferedAsyncClient
通常只需對既有的程式碼進行最少的更動。
注意
Amazon SQS 緩衝的非同步用戶端目前不支援FIFO佇列。
使用 AmazonSQSBufferedAsyncClient
開始之前,請完成 設定 Amazon SQS 中的步驟。
AWS SDK 適用於 Java 1.x
對於 AWS SDK適用於 Java 1.x 的 ,您可以AmazonSQSBufferedAsyncClient
根據下列範例建立新的 :
// Create the basic Amazon SQS async client final AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient(); // Create the buffered client final AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync);
建立新的 之後AmazonSQSBufferedAsyncClient
,您可以使用它將多個請求傳送至 Amazon SQS(如同使用 一樣AmazonSQSAsyncClient
),例如:
final CreateQueueRequest createRequest = new CreateQueueRequest().withQueueName("MyQueue"); final CreateQueueResult res = bufferedSqs.createQueue(createRequest); final SendMessageRequest request = new SendMessageRequest(); final String body = "Your message text" + System.currentTimeMillis(); request.setMessageBody( body ); request.setQueueUrl(res.getQueueUrl()); final Future<SendMessageResult> sendResult = bufferedSqs.sendMessageAsync(request); final ReceiveMessageRequest receiveRq = new ReceiveMessageRequest() .withMaxNumberOfMessages(1) .withQueueUrl(queueUrl); final ReceiveMessageResult rx = bufferedSqs.receiveMessage(receiveRq);
設定 AmazonSQSBufferedAsyncClient
AmazonSQSBufferedAsyncClient
已預先設為適合大多數使用案例的組態。您可以進一步設定 AmazonSQSBufferedAsyncClient
,例如:
-
使用必要的組態參數,建立
QueueBufferConfig
類別的執行個體。 -
對
AmazonSQSBufferedAsyncClient
建構函式提供此執行個體。
// Create the basic Amazon SQS async client final AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient(); final QueueBufferConfig config = new QueueBufferConfig() .withMaxInflightReceiveBatches(5) .withMaxDoneReceiveBatches(15); // Create the buffered client final AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync, config);
參數 | 預設值 | 描述 |
---|---|---|
longPoll |
true |
若 |
longPollWaitTimeoutSeconds |
20 秒 |
在傳回空的接收結果前, 注意停用長輪詢時,此設定沒有作用。 |
maxBatchOpenMs |
200 毫秒 |
傳出呼叫對於其他也要批次處理同類型訊息的呼叫稍作等待的時間上限 (單位為毫秒)。 此設定值越高,執行相同工作量所需的批次數就越少 (但是,批次的第一次呼叫就需要花越長時間等待)。 若將此參數設為 |
maxBatchSize |
每批次 10 個請求 |
單次請求同時批次處理的訊息數上限。此設定值越高,執行相同請求數所需的批次數量就越少。 注意每個批次 10 個請求是 Amazon 允許的最大值SQS。 |
maxBatchSizeBytes |
256 KiB |
用戶端嘗試傳送至 Amazon 的訊息批次大小上限,以位元組為單位SQS。 注意256 KiB 是 Amazon 允許的最大值SQS。 |
maxDoneReceiveBatches |
10 個批次 |
設定愈高,接收請求就愈滿意,而不必呼叫 Amazon SQS(不過,預先擷取的訊息愈多,在緩衝區中保留的時間愈長,導致自己的可見性逾時過期)。 注意
|
maxInflightOutboundBatches |
5 個批次 |
可同時處理的活動中傳出批次的上限。 設定越高,可以傳送的傳出批次越快 (受配額限制,例如 CPU或頻寬限制),而 耗用的執行緒越多 |
maxInflightReceiveBatches |
10 個批次 |
可同時處理的活動中接收批次的上限。 設定越高,可以接收的訊息越多 (受配額限制,例如 CPU或頻寬限制),而 耗用的執行緒越多 注意
|
visibilityTimeoutSeconds |
-1 |
若將此參數設為非零正值,其設定的可見性逾時值就會覆寫消費訊息的佇列所設的可見性逾時值。 注意
可見性逾時不可設為 |
AWS SDK 適用於 Java 2.x
對於 AWS SDK適用於 Java 2.x 的 ,您可以SqsAsyncBatchManager
根據下列範例建立新的 :
// Create the basic Sqs Async Client SqsAsyncClient sqs = SqsAsyncClient.builder() .region(Region.US_EAST_1) .build(); // Create the batch manager SqsAsyncBatchManager sqsAsyncBatchManager = sqs.batchManager();
建立新的 之後SqsAsyncBatchManager
,您可以使用它將多個請求傳送至 Amazon SQS(如同使用 一樣SqsAsyncClient
),例如:
final String queueName = "MyAsyncBufferedQueue" + UUID.randomUUID(); final CreateQueueRequest request = CreateQueueRequest.builder().queueName(queueName).build(); final String queueUrl = sqs.createQueue(request).join().queueUrl(); System.out.println("Queue created: " + queueUrl); // Send messages CompletableFuture<SendMessageResponse> sendMessageFuture; for (int i = 0; i < 10; i++) { final int index = i; sendMessageFuture = sqsAsyncBatchManager.sendMessage( r -> r.messageBody("Message " + index).queueUrl(queueUrl)); SendMessageResponse response= sendMessageFuture.join(); System.out.println("Message " + response.messageId() + " sent!"); } // Receive messages with customized configurations CompletableFuture<ReceiveMessageResponse> receiveResponseFuture = customizedBatchManager.receiveMessage( r -> r.queueUrl(queueUrl) .waitTimeSeconds(10) .visibilityTimeout(20) .maxNumberOfMessages(10) ); System.out.println("You have received " + receiveResponseFuture.join().messages().size() + " messages in total."); // Delete messages DeleteQueueRequest deleteQueueRequest = DeleteQueueRequest.builder().queueUrl(queueUrl).build(); int code = sqs.deleteQueue(deleteQueueRequest).join().sdkHttpResponse().statusCode(); System.out.println("Queue is deleted, with statusCode " + code);
設定 SqsAsyncBatchManager
SqsAsyncBatchManager
已預先設為適合大多數使用案例的組態。您可以進一步設定 SqsAsyncBatchManager
,例如:
透過 建立自訂組態SqsAsyncBatchManager.Builder
:
SqsAsyncBatchManager customizedBatchManager = SqsAsyncBatchManager.builder() .client(sqs) .scheduledExecutor(Executors.newScheduledThreadPool(5)) .overrideConfiguration(b -> b .maxBatchSize(10) .sendRequestFrequency(Duration.ofMillis(200)) .receiveMessageMinWaitDuration(Duration.ofSeconds(10)) .receiveMessageVisibilityTimeout(Duration.ofSeconds(20)) .receiveMessageAttributeNames(Collections.singletonList("*")) .receiveMessageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.ALL))) .build();
參數 | 預設值 | 描述 |
---|---|---|
maxBatchSize |
每批次 10 個請求 |
單次請求同時批次處理的訊息數上限。此設定值越高,執行相同請求數所需的批次數量就越少。 注意Amazon 允許的最大值SQS為每個批次 10 個請求。 |
sendRequestFrequency |
200 毫秒 |
傳出呼叫對於其他也要批次處理同類型訊息的呼叫稍作等待的時間上限 (單位為毫秒)。 此設定值越高,執行相同工作量所需的批次數就越少 (但是,批次的第一次呼叫就需要花越長時間等待)。 若將此參數設為 |
receiveMessageVisibilityTimeout |
-1 |
若將此參數設為非零正值,其設定的可見性逾時值就會覆寫消費訊息的佇列所設的可見性逾時值。 注意 |
receiveMessageMinWaitDuration |
50 毫秒 |
|