

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Amazon SQSのバッチアクション
<a name="sqs-batch-api-actions"></a>

Amazon SQS のバッチアクションを使用すると、コストを削減し、1 つのアクションで最大 10 件のメッセージを操作できます。バッチアクションは以下のとおりです。
+ `[SendMessageBatch](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessageBatch.html)`
+ `[DeleteMessageBatch](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_DeleteMessageBatch.html)`
+ `[ChangeMessageVisibilityBatch](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ChangeMessageVisibilityBatch.html)`

バッチアクションを使用すると、1 回の API コールで複数のオペレーションを実行できるため、パフォーマンスを最適化してコストを削減できます。クエリ API または Amazon SQS バッチアクションをサポートする任意の AWS SDK を使用して、バッチ機能を活用できます。

**重要な詳細**
+ **メッセージのサイズ制限:** 1 回の `SendMessageBatch` コールで送信するすべてのメッセージの合計サイズは、1,048,576 バイト (1 MiB) を超えることができません。
+ **アクセス許可:** `SendMessageBatch`、`DeleteMessageBatch`、または `ChangeMessageVisibilityBatch` のアクセス許可を明示的に設定することはできません。`SendMessage`、`DeleteMessage`、または `ChangeMessageVisibility` のアクセス許可を設定すると、代わりに、アクションの対応するバッチバージョンのアクセス許可が設定されます。
+ **コンソールのサポート:** Amazon SQS コンソールは、バッチアクションをサポートしていません。バッチオペレーションを実行するには、クエリ API または AWS SDK を使用する必要があります。

## メッセージアクションのバッチ処理
<a name="batching-message-actions"></a>

コストと効率をさらに最適化するには、メッセージアクションのバッチ処理に関する以下のベストプラクティスを検討してください。
+ **バッチ API アクション:** 1 つのアクションで複数のメッセージを送信、受信、削除したり、複数のメッセージのメッセージ可視性タイムアウトを変更したりするには、[Amazon SQS のバッチ API アクション](#sqs-batch-api-actions)を使用します。これにより、API コールの数と関連コストを削減できます。
+ **クライアント側のバッファリングとロングポーリング:** ロングポーリングと AWS SDK for Javaに含まれる[バッファリング非同期クライアント](sqs-client-side-buffering-request-batching.md)を併用することで、クライアント側のバッファリングとリクエストバッチ処理を組み合わせます。このアプローチは、リクエスト数を最小限に抑え、大量のメッセージ処理を最適化するのに役立ちます。

**注記**  
Amazon SQSバッファリング非同期クライアントは現在 FIFOキューをサポートしていません。

# Amazon SQS でのクライアント側のバッファリングとリクエストのバッチ処理の有効化
<a name="sqs-client-side-buffering-request-batching"></a>

[AWS SDK for Java](https://aws.amazon.com/sdkforjava/)を含む`AmazonSQSBufferedAsyncClient`Amazon SQSにアクセスするもの。このクライアントを使用すると、クライアント側のバッファリングを使用したシンプルなリクエストバッチ処理が可能になります。クライアントから行われた呼び出しが最初にバッファされ、Amazon SQS へのバッチリクエストとして送信される方法について説明します。

クライアント側のバッファリングは最大10個のリクエストをバッファリングし、バッチリクエストとして送信できるため、Amazon SQSの利用コストを削減して、送信リクエストの数を減らすことができます。バッファリングは同期および非同期コールの両方を`AmazonSQSBufferedAsyncClient` バッファします。バッチ処理されたリクエストと[ ロングポーリングのサポート](sqs-short-and-long-polling.md)によって、スループットを向上させることもできます。詳細については、「[Amazon SQS での水平スケーリングとアクションのバッチ処理を使用したスループットの向上](sqs-throughput-horizontal-scaling-and-batching.md)」を参照してください。

`AmazonSQSBufferedAsyncClient`は`AmazonSQSAsyncClient`と同じインターフェイスを実行するため、`AmazonSQSAsyncClient` から `AmazonSQSBufferedAsyncClient` に移行するには通常既存のコードを最小限変更するだけです。

**注記**  
Amazon SQS バッファリング非同期クライアントは現在 FIFOキューをサポートしていません。

## AmazonSQSBufferedAsyncClient の使用
<a name="using-buffered-async-client"></a>

開始する前に、「[Amazon SQSのセットアップ](sqs-setting-up.md)」のステップを完了します。

### AWS SDK for Java 1.x
<a name="using-buffered-async-client-java1"></a>

 AWS SDK for Java 1.x では、次の例`AmazonSQSBufferedAsyncClient`に基づいて新しい を作成できます。

```
// Create the basic Amazon SQS async client
final AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient();
 
// Create the buffered client
final AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync);
```

新規の `AmazonSQSBufferedAsyncClient` を作成したら、これを使用して Amazon SQS に複数のリクエストを送信できます (`AmazonSQSAsyncClient` と同様です)。次に例を示します。

```
final CreateQueueRequest createRequest = new CreateQueueRequest().withQueueName("MyQueue");
 
final CreateQueueResult res = bufferedSqs.createQueue(createRequest);
 
final SendMessageRequest request = new SendMessageRequest();
final String body = "Your message text" + System.currentTimeMillis();
request.setMessageBody( body );
request.setQueueUrl(res.getQueueUrl());
 
final Future<SendMessageResult> sendResult = bufferedSqs.sendMessageAsync(request);
 
final ReceiveMessageRequest receiveRq = new ReceiveMessageRequest()
    .withMaxNumberOfMessages(1)
    .withQueueUrl(queueUrl);
final ReceiveMessageResult rx = bufferedSqs.receiveMessage(receiveRq);
```

### AmazonSQSBufferedAsyncClient の設定
<a name="configuring-buffered-async-client"></a>

`AmazonSQSBufferedAsyncClient` は、ほとんどのユースケースに合うように事前に設定されています。たとえば、`AmazonSQSBufferedAsyncClient` をさらに設定できます。'

1. 必要な設定パラメータを使用して、`QueueBufferConfig` クラスのインスタンスを作成します。

1. `AmazonSQSBufferedAsyncClient` コンストラクタにインスタンスを指定します。

```
// Create the basic Amazon SQS async client
final AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient();
 
final QueueBufferConfig config = new QueueBufferConfig()
    .withMaxInflightReceiveBatches(5)
    .withMaxDoneReceiveBatches(15);
 
// Create the buffered client
final AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync, config);
```


**QueueBufferConfig の設定パラメータ**  

| パラメータ | デフォルトの値 | 説明 | 
| --- | --- | --- | 
| longPoll | true |  `longPoll` が `true` に設定されている場合、`AmazonSQSBufferedAsyncClient` はメッセージの処理時にロングポーリングを使用しようとします。  | 
| longPollWaitTimeoutSeconds | 20 s |  空の受信結果を返すまでに、キュー内へのメッセージの出現をサーバーが待機するのを `ReceiveMessage` 呼び出しがブロックする最大秒数。  ロングポーリングが無効になっている場合、この設定に効果はありません。   | 
| maxBatchOpenMs | 200ms |  送信呼び出しが、同じタイプのメッセージをバッチ処理する他の呼び出しを待機する最大ミリ秒。 設定を大きくすればするほど、同じ量の処理を実行するのに必要なバッチが少なくなります (ただし、バッチ内の最初の呼び出しは待機時間が長くなります)。 このパラメータを `0` に設定した場合、送信されたリクエストは他のリクエストを待機しないため、バッチ処理が事実上無効になります。  | 
| maxBatchSize | バッチあたり 10 個のリクエスト |  1 つのバッチリクエストでまとめてバッチ処理されるメッセージの最大数。設定を大きくするほど、全体数が同じリクエストの処理に要するバッチ数が減ります。  バッチあたり 10 個のリクエストはAmazon SQSの最大許容値です。   | 
| maxBatchSizeBytes | 1 MiB |  クライアントがAmazon SQSに送信しようとするメッセージバッチの最大サイズ、バイト単位。  1 MiB は、Amazon SQS の最大許容値です。   | 
| maxDoneReceiveBatches | 10 個のバッチ |  `AmazonSQSBufferedAsyncClient` がプリフェッチし、クライアント側に保存する受信バッチの最大数。 設定を大きくすればするほど、Amazon SQSを呼び出さなくても多くの受信リクエストを満たすことができます (ただし、プリフェッチされるメッセージが多くなるほど、バッファにとどまる時間が長くなるため、それ自体の可視性タイムアウトが発生する可能性があります)。  `0` は、すべてのメッセージのプリフェッチが無効になっていて、メッセージはオンデマンドでのみ消費されることを示します。   | 
| maxInflightOutboundBatches | 5 個のバッチ |  同時に処理できるアクティブな送信バッチの最大数。 設定を大きくすればするほど、送信バッチの送信速度が速くなり (CPU や帯域幅などの他のクォータの影響を受けます)、`AmazonSQSBufferedAsyncClient` により消費されるスレッドが増えます。  | 
| maxInflightReceiveBatches | 10 個のバッチ |  同時に処理できるアクティブな受信バッチの最大数。 設定を大きくすればするほど、受信するメッセージが増え (CPU や帯域幅などの他のクォータの影響を受けます)、`AmazonSQSBufferedAsyncClient` により消費されるスレッドが増えます。  `0` は、すべてのメッセージのプリフェッチが無効になっていて、メッセージはオンデマンドでのみ消費されることを示します。   | 
| visibilityTimeoutSeconds | -1 |  このパラメータが 0 以外の正の値に設定されている場合、ここで設定した可視性タイムアウトにより、メッセージの処理元のキューで設定された可視性タイムアウトが上書きされます。  `-1` は、キューのデフォルト設定が選択されていることを示します。 可視性タイムアウトを `0` に設定することはできません。   | 

### AWS SDK for Java 2.x
<a name="using-buffered-async-client-java2"></a>

 AWS SDK for Java 2.x では、次の例`SqsAsyncBatchManager`に基づいて新しい を作成できます。

```
// Create the basic Sqs Async Client
SqsAsyncClient sqs = SqsAsyncClient.builder() 
    .region(Region.US_EAST_1) 
    .build();

// Create the batch manager
SqsAsyncBatchManager sqsAsyncBatchManager = sqs.batchManager();
```

新規の `SqsAsyncBatchManager` を作成したら、これを使用して Amazon SQS に複数のリクエストを送信できます (`SqsAsyncClient` と同様です)。次に例を示します。

```
final String queueName = "MyAsyncBufferedQueue" + UUID.randomUUID();
final CreateQueueRequest request = CreateQueueRequest.builder().queueName(queueName).build();
final String queueUrl = sqs.createQueue(request).join().queueUrl();
System.out.println("Queue created: " + queueUrl);


// Send messages
CompletableFuture<SendMessageResponse> sendMessageFuture;
for (int i = 0; i < 10; i++) {
    final int index = i;
    sendMessageFuture = sqsAsyncBatchManager.sendMessage(
            r -> r.messageBody("Message " + index).queueUrl(queueUrl));
    SendMessageResponse response= sendMessageFuture.join();
    System.out.println("Message " + response.messageId() + " sent!");
}

// Receive messages with customized configurations
CompletableFuture<ReceiveMessageResponse> receiveResponseFuture = customizedBatchManager.receiveMessage(
        r -> r.queueUrl(queueUrl)
                .waitTimeSeconds(10)
                .visibilityTimeout(20)
                .maxNumberOfMessages(10)
);
System.out.println("You have received " + receiveResponseFuture.join().messages().size() + " messages in total.");

// Delete messages
DeleteQueueRequest deleteQueueRequest =  DeleteQueueRequest.builder().queueUrl(queueUrl).build();
int code = sqs.deleteQueue(deleteQueueRequest).join().sdkHttpResponse().statusCode();
System.out.println("Queue is deleted, with statusCode " + code);
```

### SqsAsyncBatchManager の設定
<a name="configuring-SqsAsyncBatchManager"></a>

`SqsAsyncBatchManager` は、ほとんどのユースケースに合うように事前に設定されています。たとえば、`SqsAsyncBatchManager` をさらに設定できます。'

`SqsAsyncBatchManager.Builder` を使用したカスタム設定の作成:

```
SqsAsyncBatchManager customizedBatchManager = SqsAsyncBatchManager.builder() 
    .client(sqs)
    .scheduledExecutor(Executors.newScheduledThreadPool(5))
    .overrideConfiguration(b -> b 
        .maxBatchSize(10)
        .sendRequestFrequency(Duration.ofMillis(200))
        .receiveMessageMinWaitDuration(Duration.ofSeconds(10))
        .receiveMessageVisibilityTimeout(Duration.ofSeconds(20)) 
        .receiveMessageAttributeNames(Collections.singletonList("*"))
        .receiveMessageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.ALL)))
    .build();
```


**`BatchOverrideConfiguration` 個のパラメータ**  

| パラメータ | デフォルトの値 | 説明 | 
| --- | --- | --- | 
| maxBatchSize |  バッチあたり 10 個のリクエスト  | 1 つのバッチリクエストでまとめてバッチ処理されるメッセージの最大数。設定を大きくするほど、全体数が同じリクエストの処理に要するバッチ数が減ります。  Amazon SQS の最大許容値はバッチあたり 10 個のリクエストです。  | 
| sendRequestFrequency |  200ms  | 送信呼び出しが、同じタイプのメッセージをバッチ処理する他の呼び出しを待機する最大ミリ秒。 設定を大きくすればするほど、同じ量の処理を実行するのに必要なバッチが少なくなります (ただし、バッチ内の最初の呼び出しは待機時間が長くなります)。 このパラメータを `0` に設定した場合、送信されたリクエストは他のリクエストを待機しないため、バッチ処理が事実上無効になります。 | 
| receiveMessageVisibilityTimeout |  -1  | このパラメータが 0 以外の正の値に設定されている場合、ここで設定した可視性タイムアウトにより、メッセージの処理元のキューで設定された可視性タイムアウトが上書きされます。   `1` は、キューのデフォルト設定が選択されていることを示します。可視性タイムアウトを `0` に設定することはできません。   | 
| receiveMessageMinWaitDuration |  50 ミリ秒  | `receiveMessage` 呼び出しが使用可能なメッセージの取得を待機する最小時間 (ミリ秒単位）。設定を大きくするほど、全体数が同じリクエストの処理に要するバッチ数が減ります。  | 

# Amazon SQS での水平スケーリングとアクションのバッチ処理を使用したスループットの向上
<a name="sqs-throughput-horizontal-scaling-and-batching"></a>

Amazon SQS は高スループットメッセージングをサポートしています。スループット制限の詳細については、「[Amazon SQS のメッセージキュー](quotas-messages.md)」を参照してください。

スループットを最大化するには:
+ それぞれのインスタンスを追加してプロデューサーとコンシューマーを水平方向に[スケール](#horizontal-scaling)します。
+ [アクションバッチ処理](#request-batching)を使用して、1 つのリクエストで複数のメッセージを送受信し、API コールのオーバーヘッドを削減します。

## 水平スケーリング
<a name="horizontal-scaling"></a>

Amazon SQSは HTTP リクエストレスポンスプロトコルを通じてアクセスするため、*リクエストのレイテンシー *(リクエストの開始からレスポンスの受信までの時間) により1回の接続を使用して1つのスレッドを処理した場合のスループットは制限されます。たとえば、Amazon EC2ベースのクライアントから同じリージョンにあるAmazon SQSへのレイテンシーが平均 20 ミリ秒の場合、1回の接続で1つのスレッドを処理した場合の最大スループットは平均で 50 TPS になります。

*水平スケーリング*には、全体的なキュースループットを高めるために、メッセージのプロデューサー (`[SendMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html)` リクエストを生成) とコンシューマー (`[ReceiveMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html)` リクエストと `[DeleteMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_DeleteMessage.html)` リクエストを生成) の数を増やすことが必要です。水平スケーリングを行うには 3 つの方法があります。
+ クライアントあたりのスレッドの数を増やす
+ クライアントを追加する
+ クライアントあたりのスレッドの数を増やし、クライアントを追加する

クライアントを追加すると、基本的にはキューのスループットが直線的に向上します。たとえば、クライアントの数を 2 倍にした場合、スループットも 2 倍になります。

## アクションバッチ処理
<a name="request-batching"></a>

*バッチ処理*では、サービスへの各ラウンドトリップでより多くの処理が実行されます (たとえば、1 回の `SendMessageBatch` リクエストで複数のメッセージを送信する場合など)。Amazon SQS バッチ アクションは、`[SendMessageBatch](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessageBatch.html)`、`[DeleteMessageBatch](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_DeleteMessageBatch.html)`、および `[ChangeMessageVisibilityBatch](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ChangeMessageVisibilityBatch.html)` です。プロデューサーやコンシューマーを変更することなくバッチ処理を活用するには、[Amazon SQSバファリング非同期クライアント](sqs-client-side-buffering-request-batching.md) を使用します。

**注記**  
`[ReceiveMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html)` は、一度に 10 件のメッセージを処理できるため、`ReceiveMessageBatch` アクションはありません。

バッチ処理では、1 件のメッセージのレイテンシー全体が受け入れられるのではなく、1 回のバッチリクエストの複数のメッセージにまたがるバッチアクションのレイテンシーが分散されます (たとえば、`[SendMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html)` リクエストなど)。各ラウンドトリップがより多くの処理を実行するため、バッチリクエストがスレッドと接続をより効率的に使用するようになり、スループットが向上します。

マッチ処理と水平スケーリングを組み合わせて、個々のメッセージリクエストより少ないスレッド、接続、リクエストで一定のスループットを実現できます。バッチ処理されたAmazon SQSアクションを使用して、最大 10 通のメッセージを一度に送信、受信、または削除できます。Amazon SQSではリクエスト単位で課金されるため、バッチ処理はコストを大幅に削減できます。

バッチ処理により、アプリケーションがいくらか複雑になる可能性はあります (たとえば、アプリケーションはメッセージを送信前に累積する必要があります。または、レスポンスを長時間待機する必要が生じることもときどきあります)。しかし、それでもバッチ処理は次の場合に効果的です: 
+ アプリケーションが短い時間で多くのメッセージを生成するため、遅延が大幅に長くなることはない。
+ 一般的なメッセージプロデューサーが自身でコントロールしていないイベントに応答してメッセージを送信する必要があるのと異なり、メッセージコンシューマーは自身の判断でキューからメッセージを取得する。

**重要**  
バッチ内の個々のメッセージが失敗しても、バッチリクエストは成功することがあります。バッチリクエストの後、必ず個々のメッセージのエラーがないか確認し、必要に応じてアクションを再試行してください。

## 1 回のオペレーションおよびバッチリクエストでの Java の使用例
<a name="working-java-example-batch-requests"></a>

### 前提条件
<a name="batch-request-java-example-prerequisites"></a>

`aws-java-sdk-sqs.jar`パッケージ、`aws-java-sdk-ec2.jar` パッケージおよび `commons-logging.jar` パッケージを Java ビルドクラスパスに追加します。以下の例は、Maven プロジェクトの `pom.xml` ファイルにあるこれらの依存関係を示しています。

```
<dependencies>
    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-java-sdk-sqs</artifactId>
        <version>LATEST</version>
    </dependency>
    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-java-sdk-ec2</artifactId>
        <version>LATEST</version>
    </dependency>
    <dependency>
        <groupId>commons-logging</groupId>
        <artifactId>commons-logging</artifactId>
        <version>LATEST</version>
    </dependency>
</dependencies>
```

### SimpleProducerConsumer.java
<a name="batch-request-java-example-code"></a>

次の Java コード例では、簡単なプロデューサー-コンシューマーパターンが実行されています。メインスレッドでは、指定された時間に 1 KB のメッセージを処理するプロデューサーおよびコンシューマースレッドが多数発生します。この例には、単一オペレーションリクエストを生成するプロデューサーおよびコンシューマーと、バッチ処理リクエストを生成するプロデューサーおよびコンシューマーが含まれています。

```
/*
 * Copyright 2010-2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *  https://aws.amazon.com/apache2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 *
 */

import com.amazonaws.AmazonClientException;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Start a specified number of producer and consumer threads, and produce-consume
 * for the least of the specified duration and 1 hour. Some messages can be left
 * in the queue because producers and consumers might not be in exact balance.
 */
public class SimpleProducerConsumer {

    // The maximum runtime of the program.
    private final static int MAX_RUNTIME_MINUTES = 60;
    private final static Log log = LogFactory.getLog(SimpleProducerConsumer.class);

    public static void main(String[] args) throws InterruptedException {

        final Scanner input = new Scanner(System.in);

        System.out.print("Enter the queue name: ");
        final String queueName = input.nextLine();

        System.out.print("Enter the number of producers: ");
        final int producerCount = input.nextInt();

        System.out.print("Enter the number of consumers: ");
        final int consumerCount = input.nextInt();

        System.out.print("Enter the number of messages per batch: ");
        final int batchSize = input.nextInt();

        System.out.print("Enter the message size in bytes: ");
        final int messageSizeByte = input.nextInt();

        System.out.print("Enter the run time in minutes: ");
        final int runTimeMinutes = input.nextInt();

        /*
         * Create a new instance of the builder with all defaults (credentials
         * and region) set automatically. For more information, see Creating
         * Service Clients in the AWS SDK for Java Developer Guide.
         */
        final ClientConfiguration clientConfiguration = new ClientConfiguration()
                .withMaxConnections(producerCount + consumerCount);

        final AmazonSQS sqsClient = AmazonSQSClientBuilder.standard()
                .withClientConfiguration(clientConfiguration)
                .build();

        final String queueUrl = sqsClient
                .getQueueUrl(new GetQueueUrlRequest(queueName)).getQueueUrl();

        // The flag used to stop producer, consumer, and monitor threads.
        final AtomicBoolean stop = new AtomicBoolean(false);

        // Start the producers.
        final AtomicInteger producedCount = new AtomicInteger();
        final Thread[] producers = new Thread[producerCount];
        for (int i = 0; i < producerCount; i++) {
            if (batchSize == 1) {
                producers[i] = new Producer(sqsClient, queueUrl, messageSizeByte,
                        producedCount, stop);
            } else {
                producers[i] = new BatchProducer(sqsClient, queueUrl, batchSize,
                        messageSizeByte, producedCount,
                        stop);
            }
            producers[i].start();
        }

        // Start the consumers.
        final AtomicInteger consumedCount = new AtomicInteger();
        final Thread[] consumers = new Thread[consumerCount];
        for (int i = 0; i < consumerCount; i++) {
            if (batchSize == 1) {
                consumers[i] = new Consumer(sqsClient, queueUrl, consumedCount,
                        stop);
            } else {
                consumers[i] = new BatchConsumer(sqsClient, queueUrl, batchSize,
                        consumedCount, stop);
            }
            consumers[i].start();
        }

        // Start the monitor thread.
        final Thread monitor = new Monitor(producedCount, consumedCount, stop);
        monitor.start();

        // Wait for the specified amount of time then stop.
        Thread.sleep(TimeUnit.MINUTES.toMillis(Math.min(runTimeMinutes,
                MAX_RUNTIME_MINUTES)));
        stop.set(true);

        // Join all threads.
        for (int i = 0; i < producerCount; i++) {
            producers[i].join();
        }

        for (int i = 0; i < consumerCount; i++) {
            consumers[i].join();
        }

        monitor.interrupt();
        monitor.join();
    }

    private static String makeRandomString(int sizeByte) {
        final byte[] bs = new byte[(int) Math.ceil(sizeByte * 5 / 8)];
        new Random().nextBytes(bs);
        bs[0] = (byte) ((bs[0] | 64) & 127);
        return new BigInteger(bs).toString(32);
    }

    /**
     * The producer thread uses {@code SendMessage}
     * to send messages until it is stopped.
     */
    private static class Producer extends Thread {
        final AmazonSQS sqsClient;
        final String queueUrl;
        final AtomicInteger producedCount;
        final AtomicBoolean stop;
        final String theMessage;

        Producer(AmazonSQS sqsQueueBuffer, String queueUrl, int messageSizeByte,
                 AtomicInteger producedCount, AtomicBoolean stop) {
            this.sqsClient = sqsQueueBuffer;
            this.queueUrl = queueUrl;
            this.producedCount = producedCount;
            this.stop = stop;
            this.theMessage = makeRandomString(messageSizeByte);
        }

        /*
         * The producedCount object tracks the number of messages produced by
         * all producer threads. If there is an error, the program exits the
         * run() method.
         */
        public void run() {
            try {
                while (!stop.get()) {
                    sqsClient.sendMessage(new SendMessageRequest(queueUrl,
                            theMessage));
                    producedCount.incrementAndGet();
                }
            } catch (AmazonClientException e) {
                /*
                 * By default, AmazonSQSClient retries calls 3 times before
                 * failing. If this unlikely condition occurs, stop.
                 */
                log.error("Producer: " + e.getMessage());
                System.exit(1);
            }
        }
    }

    /**
     * The producer thread uses {@code SendMessageBatch}
     * to send messages until it is stopped.
     */
    private static class BatchProducer extends Thread {
        final AmazonSQS sqsClient;
        final String queueUrl;
        final int batchSize;
        final AtomicInteger producedCount;
        final AtomicBoolean stop;
        final String theMessage;

        BatchProducer(AmazonSQS sqsQueueBuffer, String queueUrl, int batchSize,
                      int messageSizeByte, AtomicInteger producedCount,
                      AtomicBoolean stop) {
            this.sqsClient = sqsQueueBuffer;
            this.queueUrl = queueUrl;
            this.batchSize = batchSize;
            this.producedCount = producedCount;
            this.stop = stop;
            this.theMessage = makeRandomString(messageSizeByte);
        }

        public void run() {
            try {
                while (!stop.get()) {
                    final SendMessageBatchRequest batchRequest =
                            new SendMessageBatchRequest().withQueueUrl(queueUrl);

                    final List<SendMessageBatchRequestEntry> entries =
                            new ArrayList<SendMessageBatchRequestEntry>();
                    for (int i = 0; i < batchSize; i++)
                        entries.add(new SendMessageBatchRequestEntry()
                                .withId(Integer.toString(i))
                                .withMessageBody(theMessage));
                    batchRequest.setEntries(entries);

                    final SendMessageBatchResult batchResult =
                            sqsClient.sendMessageBatch(batchRequest);
                    producedCount.addAndGet(batchResult.getSuccessful().size());

                    /*
                     * Because SendMessageBatch can return successfully, but
                     * individual batch items fail, retry the failed batch items.
                     */
                    if (!batchResult.getFailed().isEmpty()) {
                        log.warn("Producer: retrying sending "
                                + batchResult.getFailed().size() + " messages");
                        for (int i = 0, n = batchResult.getFailed().size();
                             i < n; i++) {
                            sqsClient.sendMessage(new
                                    SendMessageRequest(queueUrl, theMessage));
                            producedCount.incrementAndGet();
                        }
                    }
                }
            } catch (AmazonClientException e) {
                /*
                 * By default, AmazonSQSClient retries calls 3 times before
                 * failing. If this unlikely condition occurs, stop.
                 */
                log.error("BatchProducer: " + e.getMessage());
                System.exit(1);
            }
        }
    }

    /**
     * The consumer thread uses {@code ReceiveMessage} and {@code DeleteMessage}
     * to consume messages until it is stopped.
     */
    private static class Consumer extends Thread {
        final AmazonSQS sqsClient;
        final String queueUrl;
        final AtomicInteger consumedCount;
        final AtomicBoolean stop;

        Consumer(AmazonSQS sqsClient, String queueUrl, AtomicInteger consumedCount,
                 AtomicBoolean stop) {
            this.sqsClient = sqsClient;
            this.queueUrl = queueUrl;
            this.consumedCount = consumedCount;
            this.stop = stop;
        }

        /*
         * Each consumer thread receives and deletes messages until the main
         * thread stops the consumer thread. The consumedCount object tracks the
         * number of messages that are consumed by all consumer threads, and the
         * count is logged periodically.
         */
        public void run() {
            try {
                while (!stop.get()) {
                    try {
                        final ReceiveMessageResult result = sqsClient
                                .receiveMessage(new
                                        ReceiveMessageRequest(queueUrl));

                        if (!result.getMessages().isEmpty()) {
                            final Message m = result.getMessages().get(0);
                            sqsClient.deleteMessage(new
                                    DeleteMessageRequest(queueUrl,
                                    m.getReceiptHandle()));
                            consumedCount.incrementAndGet();
                        }
                    } catch (AmazonClientException e) {
                        log.error(e.getMessage());
                    }
                }
            } catch (AmazonClientException e) {
                /*
                 * By default, AmazonSQSClient retries calls 3 times before
                 * failing. If this unlikely condition occurs, stop.
                 */
                log.error("Consumer: " + e.getMessage());
                System.exit(1);
            }
        }
    }

    /**
     * The consumer thread uses {@code ReceiveMessage} and {@code
     * DeleteMessageBatch} to consume messages until it is stopped.
     */
    private static class BatchConsumer extends Thread {
        final AmazonSQS sqsClient;
        final String queueUrl;
        final int batchSize;
        final AtomicInteger consumedCount;
        final AtomicBoolean stop;

        BatchConsumer(AmazonSQS sqsClient, String queueUrl, int batchSize,
                      AtomicInteger consumedCount, AtomicBoolean stop) {
            this.sqsClient = sqsClient;
            this.queueUrl = queueUrl;
            this.batchSize = batchSize;
            this.consumedCount = consumedCount;
            this.stop = stop;
        }

        public void run() {
            try {
                while (!stop.get()) {
                    final ReceiveMessageResult result = sqsClient
                            .receiveMessage(new ReceiveMessageRequest(queueUrl)
                                    .withMaxNumberOfMessages(batchSize));

                    if (!result.getMessages().isEmpty()) {
                        final List<Message> messages = result.getMessages();
                        final DeleteMessageBatchRequest batchRequest =
                                new DeleteMessageBatchRequest()
                                        .withQueueUrl(queueUrl);

                        final List<DeleteMessageBatchRequestEntry> entries =
                                new ArrayList<DeleteMessageBatchRequestEntry>();
                        for (int i = 0, n = messages.size(); i < n; i++)
                            entries.add(new DeleteMessageBatchRequestEntry()
                                    .withId(Integer.toString(i))
                                    .withReceiptHandle(messages.get(i)
                                            .getReceiptHandle()));
                        batchRequest.setEntries(entries);

                        final DeleteMessageBatchResult batchResult = sqsClient
                                .deleteMessageBatch(batchRequest);
                        consumedCount.addAndGet(batchResult.getSuccessful().size());

                        /*
                         * Because DeleteMessageBatch can return successfully,
                         * but individual batch items fail, retry the failed
                         * batch items.
                         */
                        if (!batchResult.getFailed().isEmpty()) {
                            final int n = batchResult.getFailed().size();
                            log.warn("Producer: retrying deleting " + n
                                    + " messages");
                            for (BatchResultErrorEntry e : batchResult
                                    .getFailed()) {

                                sqsClient.deleteMessage(
                                        new DeleteMessageRequest(queueUrl,
                                                messages.get(Integer
                                                        .parseInt(e.getId()))
                                                        .getReceiptHandle()));

                                consumedCount.incrementAndGet();
                            }
                        }
                    }
                }
            } catch (AmazonClientException e) {
                /*
                 * By default, AmazonSQSClient retries calls 3 times before
                 * failing. If this unlikely condition occurs, stop.
                 */
                log.error("BatchConsumer: " + e.getMessage());
                System.exit(1);
            }
        }
    }

    /**
     * This thread prints every second the number of messages produced and
     * consumed so far.
     */
    private static class Monitor extends Thread {
        private final AtomicInteger producedCount;
        private final AtomicInteger consumedCount;
        private final AtomicBoolean stop;

        Monitor(AtomicInteger producedCount, AtomicInteger consumedCount,
                AtomicBoolean stop) {
            this.producedCount = producedCount;
            this.consumedCount = consumedCount;
            this.stop = stop;
        }

        public void run() {
            try {
                while (!stop.get()) {
                    Thread.sleep(1000);
                    log.info("produced messages = " + producedCount.get()
                            + ", consumed messages = " + consumedCount.get());
                }
            } catch (InterruptedException e) {
                // Allow the thread to exit.
            }
        }
    }
}
```

### サンプル実行からのボリュームメトリクスのモニタリング
<a name="batch-request-java-example-monitoring-metrics"></a>

Amazon SQSは、メッセージの送信、受信、削除のボリュームメトリクスを自動的に生成します。これらのメトリックスと他のメトリックスにアクセスするには、キューの ｛**モニタリング**｝タブを使用するか、[[CloudWatchコンソール](https://console.aws.amazon.com/cloudwatch/home)]を使用します。

**注記**  
メトリクスを、キューが開始してから参照可能になるまで最大 15 分かかる場合があります。