SQS で Amazon の自動リクエストバッチ処理を使用する AWS SDK for Java 2.x - AWS SDK for Java 2.x

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

SQS で Amazon の自動リクエストバッチ処理を使用する AWS SDK for Java 2.x

Amazon の自動リクエストバッチ処理SQSは、SQSオペレーションAPIのリクエストをバッチ処理およびバッファ処理する効率的な方法を提供する高レベルライブラリです。バッチ処理 を使用するとAPI、 へのリクエストの数が減りSQS、スループットが向上し、コストが最小限に抑えられます。

バッチAPIメソッドはSqsAsyncClientメソッド 、sendMessagechangeMessageVisibilityと一致するためdeleteMessage、最小限の変更でバッチをドロップイン置換APIとしてreceiveMessage使用できます。

このトピックでは、APIAmazon の自動リクエストバッチを設定して使用する方法の概要を説明しますSQS。

前提条件をチェックする

バッチ処理 にアクセスするには、Java 2.x 用 のバージョン 2.28.0 以降を使用する必要がありますAPI。 SDKMaven には、少なくとも次の要素が含まれているpom.xml必要があります。

<dependencyManagement> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>bom</artifactId> <version>2.28.231</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>sqs</artifactId> </dependency> </dependencies>

1 最新バージョン

バッチマネージャーを作成する

自動リクエストバッチ処理APIは、 SqsAsyncBatchManagerインターフェイスによって実装されます。マネージャーのインスタンスは、いくつかの方法で作成できます。

を使用したデフォルト設定 SqsAsyncClient

バッチマネージャーを作成する最も簡単な方法は、既存のSqsAsyncClientインスタンスでbatchManagerファクトリメソッドを呼び出すことです。シンプルなアプローチを次のスニペットに示します。

SqsAsyncClient asyncClient = SqsAsyncClient.create(); SqsAsyncBatchManager sqsAsyncBatchManager = asyncClient.batchManager();

この方法を使用する場合、SqsAsyncBatchManagerインスタンスは の設定を上書きする SqsAsyncBatchManagerセクションの表に示されているデフォルト値を使用します。さらに、SqsAsyncBatchManagerインスタンスは、作成されたSqsAsyncClientインスタンスExecutorServiceの を使用します。

を使用したカスタム設定 SqsAsyncBatchManager.Builder

より高度なユースケースでは、 を使用してバッチマネージャーをカスタマイズできますSqsAsyncBatchManager.Builder。このアプローチを使用してSqsAsyncBatchManagerインスタンスを作成すると、バッチ処理の動作を微調整できます。次のスニペットは、ビルダーを使用してバッチ処理の動作をカスタマイズする方法の例を示しています。

SqsAsyncBatchManager batchManager = SqsAsyncBatchManager.builder() .client(SqsAsyncClient.create()) .scheduledExecutor(Executors.newScheduledThreadPool(5)) .overrideConfiguration(b -> b .receiveMessageMinWaitDuration(Duration.ofSeconds(10)) .receiveMessageVisibilityTimeout(Duration.ofSeconds(1)) .receiveMessageAttributeNames(Collections.singletonList("*")) .receiveMessageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.ALL))) .build();

この方法を使用すると、 の設定を上書きする SqsAsyncBatchManagerセクションの表に表示される BatchOverrideConfiguration オブジェクトの設定を調整できます。このアプローチを使用して、バッチマネージャーScheduledExecutorServiceにカスタム を指定することもできます。

メッセージを送信する

バッチマネージャーでメッセージを送信するには、 SqsAsyncBatchManager#sendMessageメソッドを使用します。はリクエストをSDKバッファし、 maxBatchSizeまたは sendRequestFrequencyの値に達するとバッチとして送信します。

次の例は、別のsendMessageリクエストの直後のリクエストを示しています。この場合、 は両方のメッセージを 1 つのバッチでSDK送信します。

// Sending the first message CompletableFuture<SendMessageResponse> futureOne = sqsAsyncBatchManager.sendMessage(r -> r.messageBody("One").queueUrl("queue")); // Sending the second message CompletableFuture<SendMessageResponse> futureTwo = sqsAsyncBatchManager.sendMessage(r -> r.messageBody("Two").queueUrl("queue")); // Waiting for both futures to complete and retrieving the responses SendMessageResponse messageOne = futureOne.join(); SendMessageResponse messageTwo = futureTwo.join();

メッセージの可視性タイムアウトを変更する

SqsAsyncBatchManager#changeMessageVisibility メソッドを使用して、バッチ内のメッセージの可視性タイムアウトを変更できます。はリクエストをSDKバッファし、 maxBatchSizeまたは sendRequestFrequencyの値に達するとバッチとして送信します。

次の例は、 changeMessageVisibilityメソッドを呼び出す方法を示しています。

CompletableFuture<ChangeMessageVisibilityResponse> futureOne = sqsAsyncBatchManager.changeMessageVisibility(r -> r.receiptHandle("receiptHandle") .queueUrl("queue")); ChangeMessageVisibilityResponse response = futureOne.join();

メッセージの削除

SqsAsyncBatchManager#deleteMessage メソッドを使用して、バッチ内のメッセージを削除できます。はリクエストをSDKバッファし、 maxBatchSizeまたは sendRequestFrequencyの値に達するとバッチとして送信します。

次の例は、 deleteMessageメソッドを呼び出す方法を示しています。

CompletableFuture<DeleteMessageResponse> futureOne = sqsAsyncBatchManager.deleteMessage(r -> r.receiptHandle("receiptHandle") .queueUrl("queue")); DeleteMessageResponse response = futureOne.join();

メッセージを受信する

デフォルト設定を使用する

アプリケーションで SqsAsyncBatchManager#receiveMessageメソッドをポーリングすると、バッチマネージャーは内部バッファからメッセージを取得し、 SDKは自動的にバックグラウンドで更新します。

次の例は、 receiveMessageメソッドを呼び出す方法を示しています。

CompletableFuture<ReceiveMessageResponse> responseFuture = sqsAsyncBatchManager.receiveMessage(r -> r.queueUrl("queueUrl"));

カスタム設定を使用する

例えば、カスタム待機時間を設定し、取得するメッセージの数を指定してリクエストをさらにカスタマイズする場合は、次の例に示すようにリクエストをカスタマイズできます。

CompletableFuture<ReceiveMessageResponse> response = sqsAsyncBatchManager.receiveMessage(r -> r.queueUrl("queueUrl") .waitTimeSeconds(5) .visibilityTimeout(20));
注記

次のいずれかのパラメータReceiveMessageRequestを含む receiveMessageで を呼び出すと、 はバッチマネージャーをSDKバイパスし、通常の非同期receiveMessageリクエストを送信します。

  • messageAttributeNames

  • messageSystemAttributeNames

  • messageSystemAttributeNamesWithStrings

  • overrideConfiguration

の設定を上書きする SqsAsyncBatchManager

SqsAsyncBatchManager インスタンスを作成するときに、次の設定を調整できます。次の設定のリストは、 で利用できますBatchOverrideConfiguration.Builder

設定 説明 デフォルト値
maxBatchSize 各 、SendMessageBatchRequestChangeMessageVisibilityBatchRequestまたは のバッチあたりのリクエストの最大数DeleteMessageBatchRequest。最大値は 10 です。 10
sendRequestFrequency

maxBatchSize が早く達しない限り、バッチを送信するまでの時間。値を大きくすると、リクエストは減少しますが、レイテンシーは増加します。

200 ミリ秒
receiveMessageVisibilityTimeout メッセージの可視性タイムアウト。設定解除すると、キューのデフォルトが使用されます。 キューのデフォルト
receiveMessageMinWaitDuration receiveMessage リクエストの最小待機時間。CPU 無駄を防ぐために を 0 に設定しないでください。 50 ミリ秒
receiveMessageSystemAttributeNames receiveMessage 呼び出しをリクエストするシステム属性名のリスト。 [なし]
receiveMessageAttributeNames receiveMessage 呼び出しをリクエストする属性名のリスト。 [なし]