

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# Amazon SQS 배치 작업
<a name="sqs-batch-api-actions"></a>

Amazon SQS는 비용을 절감하고 단일 작업으로 최대 10개의 메시지를 조작할 수 있는 배치 작업을 제공합니다. 이러한 배치 작업에는 다음이 포함됩니다.
+ `[SendMessageBatch](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessageBatch.html)`
+ `[DeleteMessageBatch](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_DeleteMessageBatch.html)`
+ `[ChangeMessageVisibilityBatch](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ChangeMessageVisibilityBatch.html)`

배치 작업을 사용하면 단일 API 직접 호출로 여러 작업을 수행할 수 있으므로 성능을 최적화하고 비용을 절감할 수 있습니다. Amazon SQS 배치 작업을 지원하는 쿼리 API 또는 AWS SDK를 사용하여 배치 기능을 활용할 수 있습니다.

**중요한 세부 정보**
+ **메시지 크기 제한:** 단일 `SendMessageBatch` 직접 호출로 전송되는 모든 메시지의 총 크기는 1,048,576바이트(1MiB)를 초과할 수 없습니다.
+ **권한:** `SendMessageBatch`, `DeleteMessageBatch` 또는 `ChangeMessageVisibilityBatch`에 대한 권한을 명시적으로 설정할 수 없습니다. `SendMessage`, `DeleteMessage` 또는 `ChangeMessageVisibility`에 대한 권한을 설정하면 작업의 해당 배치 버전에 대한 권한도 설정됩니다.
+ **콘솔 지원:** Amazon SQS 콘솔은 배치 작업을 지원하지 않습니다. 쿼리 API 또는 AWS SDK를 사용하여 배치 작업을 수행해야 합니다.

## 메시지 작업 일괄 처리
<a name="batching-message-actions"></a>

비용 및 효율성을 더욱 최적화하려면 메시지 작업 배치 처리를 위한 다음 모범 사례를 고려하세요.
+ **배치 API 작업:** [Amazon SQS 배치 API 작업](#sqs-batch-api-actions)을 사용하여 메시지를 전송, 수신 및 삭제하고 한 번의 작업으로 여러 메시지에 대한 메시지 표시 시간 제한을 변경합니다. 이렇게 하면 API 직접 호출 수와 관련 비용이 줄어듭니다.
+ **클라이언트 측 버퍼링 긴 롱 폴링:** AWS SDK for Java에 포함된 [버퍼링된 비동기식 클라이언트](sqs-client-side-buffering-request-batching.md)와 함께 긴 폴링을 사용하여 클라이언트 측 버퍼링과 요청 배치 처리를 결합합니다. 이 접근 방식은 요청 수를 최소화하고 대량의 메시지 처리를 최적화하는 데 도움이 됩니다.

**참고**  
Amazon SQS의 버퍼링된 비동기식 클라이언트는 현재 FIFO 대기열을 지원하지 않습니다.

# Amazon SQS로 클라이언트 측 버퍼링 및 요청 배치 처리 활성화
<a name="sqs-client-side-buffering-request-batching"></a>

[AWS SDK for Java](https://aws.amazon.com/sdkforjava/)는 Amazon SQS에 액세스하는 `AmazonSQSBufferedAsyncClient`를 포함합니다. 이 클라이언트는 클라이언트 측 버퍼링을 사용하여 간단한 요청 배치 처리를 허용합니다. 클라이언트에서 수행한 직접 호출을 먼저 버퍼링한 후 배치 요청 형태로 Amazon SQS로 전송합니다.

클라이언트 측 버퍼링을 통해 최대 10개의 요청을 버퍼링하여 배치 요청으로 전송할 수 있으므로 Amazon SQS 사용 비용이 절감되고 전송되는 요청 수가 줄어듭니다. `AmazonSQSBufferedAsyncClient` 버퍼는 동기식 및 비동기식 호출을 모두 버퍼링합니다. 배치 처리된 요청과 [긴 폴링](sqs-short-and-long-polling.md) 지원을 통해 처리량도 높일 수 있습니다. 자세한 내용은 [Amazon SQS로 수평적 조정과 작업 배치 처리를 사용하여 처리량 증대](sqs-throughput-horizontal-scaling-and-batching.md) 단원을 참조하십시오.

`AmazonSQSBufferedAsyncClient`는 `AmazonSQSAsyncClient`와 동일한 인터페이스를 구현하므로 `AmazonSQSAsyncClient`에서 `AmazonSQSBufferedAsyncClient`로 마이그레이션하려면 일반적으로 기존 코드를 최소한만 변경하면 됩니다.

**참고**  
Amazon SQS의 버퍼링된 비동기식 클라이언트는 현재 FIFO 대기열을 지원하지 않습니다.

## AmazonSQSBufferedAsyncClient 사용
<a name="using-buffered-async-client"></a>

시작하기 전에 [Amazon SQS 설정](sqs-setting-up.md)의 단계를 완료해야 합니다.

### AWS Java 1.x용 SDK
<a name="using-buffered-async-client-java1"></a>

 AWS SDK for Java 1.x의 경우 다음 예제를 `AmazonSQSBufferedAsyncClient` 기반으로 새를 생성할 수 있습니다.

```
// Create the basic Amazon SQS async client
final AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient();
 
// Create the buffered client
final AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync);
```

새 `AmazonSQSBufferedAsyncClient`를 생성한 후 이것을 이용해서 Amazon SQS에 다중 요청을 전송할 수 있습니다(`AmazonSQSAsyncClient`와 마찬가지로). 예를 들면 다음과 같습니다.

```
final CreateQueueRequest createRequest = new CreateQueueRequest().withQueueName("MyQueue");
 
final CreateQueueResult res = bufferedSqs.createQueue(createRequest);
 
final SendMessageRequest request = new SendMessageRequest();
final String body = "Your message text" + System.currentTimeMillis();
request.setMessageBody( body );
request.setQueueUrl(res.getQueueUrl());
 
final Future<SendMessageResult> sendResult = bufferedSqs.sendMessageAsync(request);
 
final ReceiveMessageRequest receiveRq = new ReceiveMessageRequest()
    .withMaxNumberOfMessages(1)
    .withQueueUrl(queueUrl);
final ReceiveMessageResult rx = bufferedSqs.receiveMessage(receiveRq);
```

### AmazonSQSBufferedAsyncClient 구성
<a name="configuring-buffered-async-client"></a>

`AmazonSQSBufferedAsyncClient`는 대부분의 사용 사례에 적용되는 설정으로 사전 구성됩니다. `AmazonSQSBufferedAsyncClient`를 추가로 구성할 수 있습니다. 예를 들면 다음과 같습니다.

1. 필요한 구성 파라미터로 `QueueBufferConfig` 클래스의 인스턴스를 만듭니다.

1. `AmazonSQSBufferedAsyncClient` 생성자에 인스턴스를 제공합니다.

```
// Create the basic Amazon SQS async client
final AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient();
 
final QueueBufferConfig config = new QueueBufferConfig()
    .withMaxInflightReceiveBatches(5)
    .withMaxDoneReceiveBatches(15);
 
// Create the buffered client
final AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync, config);
```


**QueueBufferConfig 구성 파라미터**  

| 파라미터 | 기본값  | 설명 | 
| --- | --- | --- | 
| longPoll | true |  `longPoll`이 `true`로 설정된 경우 `AmazonSQSBufferedAsyncClient`는 메시지를 사용할 때 긴 폴링을 사용하려고 시도합니다.  | 
| longPollWaitTimeoutSeconds | 20s |  `ReceiveMessage` 호출이 서버에서 차단되고 빈 수신 결과를 반환하기 전에 메시지가 대기열에 나타나기까지 대기하는 최대 시간(초)입니다.  긴 폴링을 비활성화하면 이 설정은 영향을 주지 않습니다.   | 
| maxBatchOpenMs | 200ms |  발신 호출이 동일한 유형의 메시지를 배치 처리할 다른 호출을 대기하는 최대 시간(밀리초)입니다. 설정이 높을수록 동일한 작업량을 수행하는 데 필요한 배치 수가 감소합니다(그러나 배치의 첫 번째 호출을 대기하는 시간이 길어짐). 이 파라미터를 `0`으로 설정하면, 제출된 요청은 다른 요청을 기다리지 않으므로 사실상 일괄 처리하지 않게 됩니다.  | 
| maxBatchSize | 배치당 요청 10개 |  단일 요청에서 배치 방식으로 함께 처리되는 최대 메시지 수입니다. 설정이 높을수록 동일한 요청 수를 수행하는 데 필요한 배치 수가 감소합니다.  배치당 요청 10개는 Amazon SQS에 허용되는 최대값입니다.   | 
| maxBatchSizeBytes | 1MiB |  클라이언트가 Amazon SQS에 전송하려고 시도하는 메시지 배치의 최대 크기(바이트 단위)입니다.  1MiB는 Amazon SQS에 허용되는 최댓값입니다.   | 
| maxDoneReceiveBatches | 배치 10개 |  `AmazonSQSBufferedAsyncClient`가 클라이언트 측에서 미리 가져와서 저장하는 수신 배치의 최대 개수입니다. 설정이 높을수록 Amazon SQS에 호출을 전송하지 않고도 만족할 수 있는 수신 요청 개수가 늘어납니다(그러나 미리 가져오는 메시지가 많을수록 메시지가 버퍼에 있는 시간이 길어지므로 표시 제한 시간이 만료될 수 있음).  `0`은 모든 메시지 미리 가져오기 기능이 비활성화되고 메시지가 요청 시에만 사용됨을 나타냅니다.   | 
| maxInflightOutboundBatches | 배치 5개 |  동시에 처리될 수 있는 활성 아웃바운드 배치의 최대 개수입니다. 설정이 높을수록 아웃바운드 배치 전송 속도가 빨라지고(CPU 또는 대역폭과 같은 할당량도 영향을 줌), `AmazonSQSBufferedAsyncClient`에서 사용하는 스레드 수가 많아집니다.  | 
| maxInflightReceiveBatches | 배치 10개 |  동시에 처리될 수 있는 활성 수신 배치의 최대 개수입니다. 설정이 높을수록 수신 가능한 메시지가 많아지고(CPU 또는 대역폭과 같은 할당량도 영향을 줌), `AmazonSQSBufferedAsyncClient`에서 사용하는 스레드 수가 많아집니다.  `0`은 모든 메시지 미리 가져오기 기능이 비활성화되고 메시지가 요청 시에만 사용됨을 나타냅니다.   | 
| visibilityTimeoutSeconds | -1 |  이 파라미터를 0이 아닌 양수 값으로 설정하면, 여기에 설정된 제한 시간 초과는 메시지가 사용되는 대기열에서 설정된 제한 시간 초과를 재정의합니다.  `-1`은 대기열에서 기본 설정이 선택되어 있음을 나타냅니다. 제한 시간 초과를 `0`으로 설정할 수 없습니다.   | 

### AWS Java 2.x용 SDK
<a name="using-buffered-async-client-java2"></a>

 AWS SDK for Java 2.x의 경우 다음 예제를 `SqsAsyncBatchManager` 기반으로 새를 생성할 수 있습니다.

```
// Create the basic Sqs Async Client
SqsAsyncClient sqs = SqsAsyncClient.builder() 
    .region(Region.US_EAST_1) 
    .build();

// Create the batch manager
SqsAsyncBatchManager sqsAsyncBatchManager = sqs.batchManager();
```

새 `SqsAsyncBatchManager`를 생성한 후 이것을 이용해서 Amazon SQS에 다중 요청을 전송할 수 있습니다(`SqsAsyncClient`와 마찬가지로). 예를 들면 다음과 같습니다.

```
final String queueName = "MyAsyncBufferedQueue" + UUID.randomUUID();
final CreateQueueRequest request = CreateQueueRequest.builder().queueName(queueName).build();
final String queueUrl = sqs.createQueue(request).join().queueUrl();
System.out.println("Queue created: " + queueUrl);


// Send messages
CompletableFuture<SendMessageResponse> sendMessageFuture;
for (int i = 0; i < 10; i++) {
    final int index = i;
    sendMessageFuture = sqsAsyncBatchManager.sendMessage(
            r -> r.messageBody("Message " + index).queueUrl(queueUrl));
    SendMessageResponse response= sendMessageFuture.join();
    System.out.println("Message " + response.messageId() + " sent!");
}

// Receive messages with customized configurations
CompletableFuture<ReceiveMessageResponse> receiveResponseFuture = customizedBatchManager.receiveMessage(
        r -> r.queueUrl(queueUrl)
                .waitTimeSeconds(10)
                .visibilityTimeout(20)
                .maxNumberOfMessages(10)
);
System.out.println("You have received " + receiveResponseFuture.join().messages().size() + " messages in total.");

// Delete messages
DeleteQueueRequest deleteQueueRequest =  DeleteQueueRequest.builder().queueUrl(queueUrl).build();
int code = sqs.deleteQueue(deleteQueueRequest).join().sdkHttpResponse().statusCode();
System.out.println("Queue is deleted, with statusCode " + code);
```

### SqsAsyncBatchManager 구성
<a name="configuring-SqsAsyncBatchManager"></a>

`SqsAsyncBatchManager`는 대부분의 사용 사례에 적용되는 설정으로 사전 구성됩니다. `SqsAsyncBatchManager`를 추가로 구성할 수 있습니다. 예를 들면 다음과 같습니다.

`SqsAsyncBatchManager.Builder`를 통한 사용자 지정 구성 생성:

```
SqsAsyncBatchManager customizedBatchManager = SqsAsyncBatchManager.builder() 
    .client(sqs)
    .scheduledExecutor(Executors.newScheduledThreadPool(5))
    .overrideConfiguration(b -> b 
        .maxBatchSize(10)
        .sendRequestFrequency(Duration.ofMillis(200))
        .receiveMessageMinWaitDuration(Duration.ofSeconds(10))
        .receiveMessageVisibilityTimeout(Duration.ofSeconds(20)) 
        .receiveMessageAttributeNames(Collections.singletonList("*"))
        .receiveMessageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.ALL)))
    .build();
```


**`BatchOverrideConfiguration` 파라미터**  

| 파라미터 | 기본값  | 설명 | 
| --- | --- | --- | 
| maxBatchSize |  배치당 요청 10개  | 단일 요청에서 배치 방식으로 함께 처리되는 최대 메시지 수입니다. 설정이 높을수록 동일한 요청 수를 수행하는 데 필요한 배치 수가 감소합니다.  Amazon SQS에 허용되는 최댓값은 배치당 요청 10개입니다.  | 
| sendRequestFrequency |  200ms  | 발신 호출이 동일한 유형의 메시지를 배치 처리할 다른 호출을 대기하는 최대 시간(밀리초)입니다. 설정이 높을수록 동일한 작업량을 수행하는 데 필요한 배치 수가 감소합니다(그러나 배치의 첫 번째 호출을 대기하는 시간이 길어짐). 이 파라미터를 `0`으로 설정하면, 제출된 요청은 다른 요청을 기다리지 않으므로 사실상 일괄 처리하지 않게 됩니다. | 
| receiveMessageVisibilityTimeout |  -1  | 이 파라미터를 0이 아닌 양수 값으로 설정하면, 여기에 설정된 제한 시간 초과는 메시지가 사용되는 대기열에서 설정된 제한 시간 초과를 재정의합니다.   `1`은 대기열에서 기본 설정이 선택되어 있음을 나타냅니다. 제한 시간 초과를 `0`으로 설정할 수 없습니다.   | 
| receiveMessageMinWaitDuration |  50ms  | 사용 가능한 메시지를 가져올 때까지 `receiveMessage` 직접 호출이 대기하는 최소 시간(밀리초)입니다. 설정이 높을수록 동일한 요청 수를 수행하는 데 필요한 배치 수가 감소합니다.  | 

# Amazon SQS로 수평적 조정과 작업 배치 처리를 사용하여 처리량 증대
<a name="sqs-throughput-horizontal-scaling-and-batching"></a>

Amazon SQS는 처리량이 많은 메시징을 지원합니다. 처리량 한도에 대한 자세한 내용은 [Amazon SQS 메시지 할당량](quotas-messages.md) 섹션을 참조하세요.

처리량을 극대화하는 방법:
+ 각 인스턴스를 더 추가하여 생산자와 소비자를 수평적으로 [규모를 조정](#horizontal-scaling)합니다.
+ [작업 배치 처리](#request-batching)를 사용하여 단일 요청으로 여러 메시지를 보내거나 수신하여 API 직접 호출 오버헤드를 줄입니다.

## 수평 크기 조정
<a name="horizontal-scaling"></a>

HTTP 요청-응답 프로토콜을 통해 Amazon SQS에 액세스하기 때문에 요청 지연 시간**(요청을 시작해서 응답을 받기까지의 시간 간격)은 단일 연결을 사용하여 단일 스레드에서 확보할 수 있는 처리량을 제한합니다. 예를 들어, 동일한 리전에서 Amazon EC2 기반 클라이언트부터 Amazon SQS까지 지연 시간이 평균 20ms인 경우 단일 연결을 통한 단일 스레드의 최대 처리량은 평균 50TPS입니다.

*수평적 조정*은 메시지 생성자(`[SendMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html)` 요청 전송)와 소비자(`[ReceiveMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html)` 및 `[DeleteMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_DeleteMessage.html)` 요청 전송) 수를 늘리는 것으로 전체 대기열 처리량을 높여줍니다. 다음 세 가지 방법으로 수평 확장할 수 있습니다.
+ 클라이언트당 스레드 수 늘리기
+ 클라이언트 추가
+ 클라이언트당 스레드 수를 늘리고 클라이언트 추가

클라이언트를 더 추가하면 대기열 처리량이 선형적으로 증가합니다. 예를 들어 클라이언트 개수를 2배 늘리면 처리량도 2배가 됩니다.

## 작업 일괄 처리
<a name="request-batching"></a>

*일괄 처리*는 서비스까지 왕복할 때마다 더 많은 작업을 수행합니다(예: `SendMessageBatch` 요청 하나로 여러 메시지를 전송하는 경우). Amazon SQS 배치 작업은 `[SendMessageBatch](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessageBatch.html)`, `[DeleteMessageBatch](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_DeleteMessageBatch.html)` 및 `[ChangeMessageVisibilityBatch](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ChangeMessageVisibilityBatch.html)`입니다. 생산자 또는 소비자를 변경하지 않고 배치 처리를 활용하려면 [Amazon SQS의 버퍼링된 비동기식 클라이언트](sqs-client-side-buffering-request-batching.md)를 사용하면 됩니다.

**참고**  
`[ReceiveMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html)`는 한 번에 메시지를 10개 처리할 수 있으므로 `ReceiveMessageBatch` 작업이 없습니다.

일괄 처리는 메시지 하나의 전체 지연 시간을 수락하는 것이 아니라, 배치 작업의 지연 시간을 배치 요청의 여러 메시지로 분산시키는 것입니다(예: `[SendMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html)` 요청). 각 왕복마다 더 많은 작업을 수행하기 때문에 배치 요청은 스레드와 연결을 보다 효율적으로 사용하므로 처리량이 개선됩니다.

일괄 처리와 수평적 조정을 함께 사용하면 개별 메시지 요청보다 스레드, 연결 및 요청 수가 적은 처리량을 얻습니다. 배치 처리된 Amazon SQS 작업을 사용하여 한 번에 최대 10개 메시지를 전송, 수신 또는 삭제할 수 있습니다. Amazon SQS는 요청별로 요금을 부과하기 때문에 배치 처리 시 비용을 대폭 절감할 수 있습니다.

일괄 처리를 사용하면 애플리케이션에 어느 정도 복잡성이 수반됩니다(예를 들어 애플리케이션이 메시지를 누적한 후 메시지를 전송하고 때로는 응답을 받기까지 대기 시간이 길어짐). 그러나 다음과 같은 상황에서는 일괄 처리가 효과적일 수 있습니다.
+ 애플리케이션이 단시간에 많은 메시지를 생성하지만 지연 시간이 매우 길지 않은 경우.
+ 제어하지 못하는 이벤트에 대응하여 메시지를 전송해야 하는 일반적인 메시지 생성자와 달리 메시지 소비자가 단독 재량으로 대기열에서 메시지를 가져오는 경우.

**중요**  
배치의 개별 메시지에 오류가 발생하더라도 배치 요청이 성공할 수 있습니다. 배치 요청 후에는 항상 개별 메시지의 오류 여부를 확인하고 필요에 따라 작업을 다시 시도해야 합니다.

## 단일 작업 및 배치 요청에 대한 Java 사용 예제
<a name="working-java-example-batch-requests"></a>

### 사전 조건
<a name="batch-request-java-example-prerequisites"></a>

`aws-java-sdk-sqs.jar`, `aws-java-sdk-ec2.jar`, `commons-logging.jar` 패키지를 Java 빌드 클래스 경로에 추가합니다. 다음 예제는 Maven 프로젝트의 `pom.xml` 파일 내에 존재하는 이러한 종속성을 보여줍니다.

```
<dependencies>
    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-java-sdk-sqs</artifactId>
        <version>LATEST</version>
    </dependency>
    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-java-sdk-ec2</artifactId>
        <version>LATEST</version>
    </dependency>
    <dependency>
        <groupId>commons-logging</groupId>
        <artifactId>commons-logging</artifactId>
        <version>LATEST</version>
    </dependency>
</dependencies>
```

### SimpleProducerConsumer.java
<a name="batch-request-java-example-code"></a>

다음 Java 코드 예제는 단순한 생성자-소비자 패턴을 구현한 것입니다. 기본 스레드는 지정된 시간 동안 1KB 메시지를 처리하는 생성자와 소비자 스레드를 다수 생성합니다. 이 예제에는 단일 작업을 요청하는 생성자와 소비자뿐 아니라 배치 요청을 하는 생성자와 소비자가 포함되어 있습니다.

```
/*
 * Copyright 2010-2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *  https://aws.amazon.com/apache2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 *
 */

import com.amazonaws.AmazonClientException;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Start a specified number of producer and consumer threads, and produce-consume
 * for the least of the specified duration and 1 hour. Some messages can be left
 * in the queue because producers and consumers might not be in exact balance.
 */
public class SimpleProducerConsumer {

    // The maximum runtime of the program.
    private final static int MAX_RUNTIME_MINUTES = 60;
    private final static Log log = LogFactory.getLog(SimpleProducerConsumer.class);

    public static void main(String[] args) throws InterruptedException {

        final Scanner input = new Scanner(System.in);

        System.out.print("Enter the queue name: ");
        final String queueName = input.nextLine();

        System.out.print("Enter the number of producers: ");
        final int producerCount = input.nextInt();

        System.out.print("Enter the number of consumers: ");
        final int consumerCount = input.nextInt();

        System.out.print("Enter the number of messages per batch: ");
        final int batchSize = input.nextInt();

        System.out.print("Enter the message size in bytes: ");
        final int messageSizeByte = input.nextInt();

        System.out.print("Enter the run time in minutes: ");
        final int runTimeMinutes = input.nextInt();

        /*
         * Create a new instance of the builder with all defaults (credentials
         * and region) set automatically. For more information, see Creating
         * Service Clients in the AWS SDK for Java Developer Guide.
         */
        final ClientConfiguration clientConfiguration = new ClientConfiguration()
                .withMaxConnections(producerCount + consumerCount);

        final AmazonSQS sqsClient = AmazonSQSClientBuilder.standard()
                .withClientConfiguration(clientConfiguration)
                .build();

        final String queueUrl = sqsClient
                .getQueueUrl(new GetQueueUrlRequest(queueName)).getQueueUrl();

        // The flag used to stop producer, consumer, and monitor threads.
        final AtomicBoolean stop = new AtomicBoolean(false);

        // Start the producers.
        final AtomicInteger producedCount = new AtomicInteger();
        final Thread[] producers = new Thread[producerCount];
        for (int i = 0; i < producerCount; i++) {
            if (batchSize == 1) {
                producers[i] = new Producer(sqsClient, queueUrl, messageSizeByte,
                        producedCount, stop);
            } else {
                producers[i] = new BatchProducer(sqsClient, queueUrl, batchSize,
                        messageSizeByte, producedCount,
                        stop);
            }
            producers[i].start();
        }

        // Start the consumers.
        final AtomicInteger consumedCount = new AtomicInteger();
        final Thread[] consumers = new Thread[consumerCount];
        for (int i = 0; i < consumerCount; i++) {
            if (batchSize == 1) {
                consumers[i] = new Consumer(sqsClient, queueUrl, consumedCount,
                        stop);
            } else {
                consumers[i] = new BatchConsumer(sqsClient, queueUrl, batchSize,
                        consumedCount, stop);
            }
            consumers[i].start();
        }

        // Start the monitor thread.
        final Thread monitor = new Monitor(producedCount, consumedCount, stop);
        monitor.start();

        // Wait for the specified amount of time then stop.
        Thread.sleep(TimeUnit.MINUTES.toMillis(Math.min(runTimeMinutes,
                MAX_RUNTIME_MINUTES)));
        stop.set(true);

        // Join all threads.
        for (int i = 0; i < producerCount; i++) {
            producers[i].join();
        }

        for (int i = 0; i < consumerCount; i++) {
            consumers[i].join();
        }

        monitor.interrupt();
        monitor.join();
    }

    private static String makeRandomString(int sizeByte) {
        final byte[] bs = new byte[(int) Math.ceil(sizeByte * 5 / 8)];
        new Random().nextBytes(bs);
        bs[0] = (byte) ((bs[0] | 64) & 127);
        return new BigInteger(bs).toString(32);
    }

    /**
     * The producer thread uses {@code SendMessage}
     * to send messages until it is stopped.
     */
    private static class Producer extends Thread {
        final AmazonSQS sqsClient;
        final String queueUrl;
        final AtomicInteger producedCount;
        final AtomicBoolean stop;
        final String theMessage;

        Producer(AmazonSQS sqsQueueBuffer, String queueUrl, int messageSizeByte,
                 AtomicInteger producedCount, AtomicBoolean stop) {
            this.sqsClient = sqsQueueBuffer;
            this.queueUrl = queueUrl;
            this.producedCount = producedCount;
            this.stop = stop;
            this.theMessage = makeRandomString(messageSizeByte);
        }

        /*
         * The producedCount object tracks the number of messages produced by
         * all producer threads. If there is an error, the program exits the
         * run() method.
         */
        public void run() {
            try {
                while (!stop.get()) {
                    sqsClient.sendMessage(new SendMessageRequest(queueUrl,
                            theMessage));
                    producedCount.incrementAndGet();
                }
            } catch (AmazonClientException e) {
                /*
                 * By default, AmazonSQSClient retries calls 3 times before
                 * failing. If this unlikely condition occurs, stop.
                 */
                log.error("Producer: " + e.getMessage());
                System.exit(1);
            }
        }
    }

    /**
     * The producer thread uses {@code SendMessageBatch}
     * to send messages until it is stopped.
     */
    private static class BatchProducer extends Thread {
        final AmazonSQS sqsClient;
        final String queueUrl;
        final int batchSize;
        final AtomicInteger producedCount;
        final AtomicBoolean stop;
        final String theMessage;

        BatchProducer(AmazonSQS sqsQueueBuffer, String queueUrl, int batchSize,
                      int messageSizeByte, AtomicInteger producedCount,
                      AtomicBoolean stop) {
            this.sqsClient = sqsQueueBuffer;
            this.queueUrl = queueUrl;
            this.batchSize = batchSize;
            this.producedCount = producedCount;
            this.stop = stop;
            this.theMessage = makeRandomString(messageSizeByte);
        }

        public void run() {
            try {
                while (!stop.get()) {
                    final SendMessageBatchRequest batchRequest =
                            new SendMessageBatchRequest().withQueueUrl(queueUrl);

                    final List<SendMessageBatchRequestEntry> entries =
                            new ArrayList<SendMessageBatchRequestEntry>();
                    for (int i = 0; i < batchSize; i++)
                        entries.add(new SendMessageBatchRequestEntry()
                                .withId(Integer.toString(i))
                                .withMessageBody(theMessage));
                    batchRequest.setEntries(entries);

                    final SendMessageBatchResult batchResult =
                            sqsClient.sendMessageBatch(batchRequest);
                    producedCount.addAndGet(batchResult.getSuccessful().size());

                    /*
                     * Because SendMessageBatch can return successfully, but
                     * individual batch items fail, retry the failed batch items.
                     */
                    if (!batchResult.getFailed().isEmpty()) {
                        log.warn("Producer: retrying sending "
                                + batchResult.getFailed().size() + " messages");
                        for (int i = 0, n = batchResult.getFailed().size();
                             i < n; i++) {
                            sqsClient.sendMessage(new
                                    SendMessageRequest(queueUrl, theMessage));
                            producedCount.incrementAndGet();
                        }
                    }
                }
            } catch (AmazonClientException e) {
                /*
                 * By default, AmazonSQSClient retries calls 3 times before
                 * failing. If this unlikely condition occurs, stop.
                 */
                log.error("BatchProducer: " + e.getMessage());
                System.exit(1);
            }
        }
    }

    /**
     * The consumer thread uses {@code ReceiveMessage} and {@code DeleteMessage}
     * to consume messages until it is stopped.
     */
    private static class Consumer extends Thread {
        final AmazonSQS sqsClient;
        final String queueUrl;
        final AtomicInteger consumedCount;
        final AtomicBoolean stop;

        Consumer(AmazonSQS sqsClient, String queueUrl, AtomicInteger consumedCount,
                 AtomicBoolean stop) {
            this.sqsClient = sqsClient;
            this.queueUrl = queueUrl;
            this.consumedCount = consumedCount;
            this.stop = stop;
        }

        /*
         * Each consumer thread receives and deletes messages until the main
         * thread stops the consumer thread. The consumedCount object tracks the
         * number of messages that are consumed by all consumer threads, and the
         * count is logged periodically.
         */
        public void run() {
            try {
                while (!stop.get()) {
                    try {
                        final ReceiveMessageResult result = sqsClient
                                .receiveMessage(new
                                        ReceiveMessageRequest(queueUrl));

                        if (!result.getMessages().isEmpty()) {
                            final Message m = result.getMessages().get(0);
                            sqsClient.deleteMessage(new
                                    DeleteMessageRequest(queueUrl,
                                    m.getReceiptHandle()));
                            consumedCount.incrementAndGet();
                        }
                    } catch (AmazonClientException e) {
                        log.error(e.getMessage());
                    }
                }
            } catch (AmazonClientException e) {
                /*
                 * By default, AmazonSQSClient retries calls 3 times before
                 * failing. If this unlikely condition occurs, stop.
                 */
                log.error("Consumer: " + e.getMessage());
                System.exit(1);
            }
        }
    }

    /**
     * The consumer thread uses {@code ReceiveMessage} and {@code
     * DeleteMessageBatch} to consume messages until it is stopped.
     */
    private static class BatchConsumer extends Thread {
        final AmazonSQS sqsClient;
        final String queueUrl;
        final int batchSize;
        final AtomicInteger consumedCount;
        final AtomicBoolean stop;

        BatchConsumer(AmazonSQS sqsClient, String queueUrl, int batchSize,
                      AtomicInteger consumedCount, AtomicBoolean stop) {
            this.sqsClient = sqsClient;
            this.queueUrl = queueUrl;
            this.batchSize = batchSize;
            this.consumedCount = consumedCount;
            this.stop = stop;
        }

        public void run() {
            try {
                while (!stop.get()) {
                    final ReceiveMessageResult result = sqsClient
                            .receiveMessage(new ReceiveMessageRequest(queueUrl)
                                    .withMaxNumberOfMessages(batchSize));

                    if (!result.getMessages().isEmpty()) {
                        final List<Message> messages = result.getMessages();
                        final DeleteMessageBatchRequest batchRequest =
                                new DeleteMessageBatchRequest()
                                        .withQueueUrl(queueUrl);

                        final List<DeleteMessageBatchRequestEntry> entries =
                                new ArrayList<DeleteMessageBatchRequestEntry>();
                        for (int i = 0, n = messages.size(); i < n; i++)
                            entries.add(new DeleteMessageBatchRequestEntry()
                                    .withId(Integer.toString(i))
                                    .withReceiptHandle(messages.get(i)
                                            .getReceiptHandle()));
                        batchRequest.setEntries(entries);

                        final DeleteMessageBatchResult batchResult = sqsClient
                                .deleteMessageBatch(batchRequest);
                        consumedCount.addAndGet(batchResult.getSuccessful().size());

                        /*
                         * Because DeleteMessageBatch can return successfully,
                         * but individual batch items fail, retry the failed
                         * batch items.
                         */
                        if (!batchResult.getFailed().isEmpty()) {
                            final int n = batchResult.getFailed().size();
                            log.warn("Producer: retrying deleting " + n
                                    + " messages");
                            for (BatchResultErrorEntry e : batchResult
                                    .getFailed()) {

                                sqsClient.deleteMessage(
                                        new DeleteMessageRequest(queueUrl,
                                                messages.get(Integer
                                                        .parseInt(e.getId()))
                                                        .getReceiptHandle()));

                                consumedCount.incrementAndGet();
                            }
                        }
                    }
                }
            } catch (AmazonClientException e) {
                /*
                 * By default, AmazonSQSClient retries calls 3 times before
                 * failing. If this unlikely condition occurs, stop.
                 */
                log.error("BatchConsumer: " + e.getMessage());
                System.exit(1);
            }
        }
    }

    /**
     * This thread prints every second the number of messages produced and
     * consumed so far.
     */
    private static class Monitor extends Thread {
        private final AtomicInteger producedCount;
        private final AtomicInteger consumedCount;
        private final AtomicBoolean stop;

        Monitor(AtomicInteger producedCount, AtomicInteger consumedCount,
                AtomicBoolean stop) {
            this.producedCount = producedCount;
            this.consumedCount = consumedCount;
            this.stop = stop;
        }

        public void run() {
            try {
                while (!stop.get()) {
                    Thread.sleep(1000);
                    log.info("produced messages = " + producedCount.get()
                            + ", consumed messages = " + consumedCount.get());
                }
            } catch (InterruptedException e) {
                // Allow the thread to exit.
            }
        }
    }
}
```

### 예제 실행에서 볼륨 측정치 모니터링
<a name="batch-request-java-example-monitoring-metrics"></a>

Amazon SQS는 전송, 수신 및 삭제된 메시지에 대한 볼륨 지표를 자동으로 생성합니다. 대기열의 **모니터링** 탭이나 [CloudWatch 콘솔](https://console.aws.amazon.com/cloudwatch/home)을 통해 이러한 지표 및 기타 지표에 액세스할 수 있습니다.

**참고**  
이 측정치의 경우 대기열을 사용할 수 있을 때까지 최대 15분이 걸릴 수 있습니다.