

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Tindakan batch Amazon SQS
<a name="sqs-batch-api-actions"></a>

Amazon SQS menyediakan tindakan batch untuk membantu Anda mengurangi biaya dan memanipulasi hingga 10 pesan dengan satu tindakan. Tindakan batch ini meliputi:
+ `[SendMessageBatch](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessageBatch.html)`
+ `[DeleteMessageBatch](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_DeleteMessageBatch.html)`
+ `[ChangeMessageVisibilityBatch](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ChangeMessageVisibilityBatch.html)`

Dengan menggunakan tindakan batch, Anda dapat melakukan beberapa operasi dalam satu panggilan API, yang membantu mengoptimalkan kinerja dan mengurangi biaya. Anda dapat memanfaatkan fungsionalitas batch menggunakan API kueri atau AWS SDK apa pun yang mendukung tindakan batch Amazon SQS.

**Detail Penting**
+ **Batas Ukuran Pesan:** Ukuran total semua pesan yang dikirim dalam satu `SendMessageBatch` panggilan tidak boleh melebihi 1.048.576 byte (1 MiB)
+ **Izin:** Anda tidak dapat menetapkan izin secara eksplisit untuk`SendMessageBatch`,, atau. `DeleteMessageBatch` `ChangeMessageVisibilityBatch` Sebagai gantinya, menyetel izin untuk `SendMessage``DeleteMessage`,, atau `ChangeMessageVisibility` menetapkan izin untuk versi batch tindakan yang sesuai.
+ **Dukungan Konsol:** Konsol Amazon SQS tidak mendukung tindakan batch. Anda harus menggunakan API kueri atau AWS SDK untuk melakukan operasi batch.

## Tindakan pesan batching
<a name="batching-message-actions"></a>

Untuk lebih mengoptimalkan biaya dan efisiensi, pertimbangkan praktik terbaik berikut untuk tindakan pengelompokan pesan:
+ **Tindakan API Batch:** Gunakan tindakan [tindakan API batch Amazon SQS](#sqs-batch-api-actions) untuk mengirim, menerima, dan menghapus pesan, serta mengubah batas waktu visibilitas pesan untuk beberapa pesan dengan satu tindakan. Ini mengurangi jumlah panggilan API dan biaya terkait.
+ **Buffering Sisi Klien dan Polling Panjang:** [Gabungkan buffering sisi klien dengan batch permintaan dengan menggunakan polling panjang bersama dengan klien asinkron buffer yang disertakan dengan.](sqs-client-side-buffering-request-batching.md) AWS SDK untuk Java Pendekatan ini membantu meminimalkan jumlah permintaan dan mengoptimalkan penanganan pesan dalam jumlah besar.

**catatan**  
Klien Asinkron Buffered Amazon SQS saat ini tidak mendukung antrian FIFO.

# Mengaktifkan buffering sisi klien dan batching permintaan dengan Amazon SQS
<a name="sqs-client-side-buffering-request-batching"></a>

[AWS SDK untuk Java](https://aws.amazon.com/sdkforjava/)Termasuk `AmazonSQSBufferedAsyncClient` yang mengakses Amazon SQS. Klien ini memungkinkan batching permintaan sederhana menggunakan buffering sisi klien. Panggilan yang dilakukan dari klien pertama kali di-buffer dan kemudian dikirim sebagai permintaan batch ke Amazon SQS.

Buffering sisi klien memungkinkan hingga 10 permintaan untuk di-buffer dan dikirim sebagai permintaan batch, mengurangi biaya penggunaan Amazon SQS dan mengurangi jumlah permintaan yang dikirim. `AmazonSQSBufferedAsyncClient`buffer panggilan sinkron dan asinkron. Permintaan batch dan dukungan untuk [polling panjang](sqs-short-and-long-polling.md) juga dapat membantu meningkatkan throughput. Untuk informasi selengkapnya, lihat [Meningkatkan throughput menggunakan penskalaan horizontal dan batching aksi dengan Amazon SQS](sqs-throughput-horizontal-scaling-and-batching.md).

Karena `AmazonSQSBufferedAsyncClient` mengimplementasikan antarmuka yang sama seperti`AmazonSQSAsyncClient`, migrasi dari `AmazonSQSAsyncClient` ke `AmazonSQSBufferedAsyncClient` biasanya hanya memerlukan sedikit perubahan pada kode Anda yang ada.

**catatan**  
Klien Asinkron Buffered Amazon SQS saat ini tidak mendukung antrian FIFO.

## Menggunakan Amazon SQSBuffered AsyncClient
<a name="using-buffered-async-client"></a>

Sebelum memulai, selesaikan langkah-langkah di [Menyiapkan Amazon SQS](sqs-setting-up.md). 

### AWS SDK for Java 1.x
<a name="using-buffered-async-client-java1"></a>

Untuk AWS SDK for Java 1.x, Anda dapat membuat yang `AmazonSQSBufferedAsyncClient` baru berdasarkan contoh berikut:

```
// Create the basic Amazon SQS async client
final AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient();
 
// Create the buffered client
final AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync);
```

Setelah Anda membuat yang baru`AmazonSQSBufferedAsyncClient`, Anda dapat menggunakannya untuk mengirim beberapa permintaan ke Amazon SQS (seperti yang Anda bisa dengan`AmazonSQSAsyncClient`), misalnya:

```
final CreateQueueRequest createRequest = new CreateQueueRequest().withQueueName("MyQueue");
 
final CreateQueueResult res = bufferedSqs.createQueue(createRequest);
 
final SendMessageRequest request = new SendMessageRequest();
final String body = "Your message text" + System.currentTimeMillis();
request.setMessageBody( body );
request.setQueueUrl(res.getQueueUrl());
 
final Future<SendMessageResult> sendResult = bufferedSqs.sendMessageAsync(request);
 
final ReceiveMessageRequest receiveRq = new ReceiveMessageRequest()
    .withMaxNumberOfMessages(1)
    .withQueueUrl(queueUrl);
final ReceiveMessageResult rx = bufferedSqs.receiveMessage(receiveRq);
```

### Mengkonfigurasi Amazon SQSBuffered AsyncClient
<a name="configuring-buffered-async-client"></a>

`AmazonSQSBufferedAsyncClient`telah dikonfigurasi sebelumnya dengan pengaturan yang berfungsi untuk sebagian besar kasus penggunaan. Anda dapat mengkonfigurasi lebih lanjut`AmazonSQSBufferedAsyncClient`, misalnya:

1. Buat instance `QueueBufferConfig` kelas dengan parameter konfigurasi yang diperlukan.

1. Berikan instance ke `AmazonSQSBufferedAsyncClient` konstruktor.

```
// Create the basic Amazon SQS async client
final AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient();
 
final QueueBufferConfig config = new QueueBufferConfig()
    .withMaxInflightReceiveBatches(5)
    .withMaxDoneReceiveBatches(15);
 
// Create the buffered client
final AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync, config);
```


**QueueBufferConfig parameter konfigurasi**  

| Parameter | Nilai default | Deskripsi | 
| --- | --- | --- | 
| longPoll | true |  Ketika `longPoll` diatur ke`true`, `AmazonSQSBufferedAsyncClient` mencoba untuk menggunakan polling panjang ketika mengkonsumsi pesan.  | 
| longPollWaitTimeoutSeconds | 20 s |  Jumlah waktu maksimum (dalam detik) yang diblokir `ReceiveMessage` panggilan di server, menunggu pesan muncul dalam antrian sebelum kembali dengan hasil penerimaan kosong.  Ketika polling panjang dinonaktifkan, pengaturan ini tidak berpengaruh.   | 
| maxBatchOpenMs | 200 ms |  Jumlah waktu maksimum (dalam milidetik) panggilan keluar menunggu panggilan lain yang dengannya ia mengumpulkan pesan dari jenis yang sama. Semakin tinggi pengaturan, semakin sedikit batch yang diperlukan untuk melakukan jumlah pekerjaan yang sama (namun, panggilan pertama dalam batch harus menghabiskan waktu lebih lama menunggu). Saat Anda menyetel parameter ini`0`, permintaan yang dikirimkan tidak menunggu permintaan lain, yang secara efektif menonaktifkan batching.  | 
| maxBatchSize | 10 permintaan per batch |  Jumlah maksimum pesan yang dikumpulkan bersama dalam satu permintaan. Semakin tinggi pengaturan, semakin sedikit batch yang diperlukan untuk melakukan jumlah permintaan yang sama.  10 permintaan per batch adalah nilai maksimum yang diizinkan untuk Amazon SQS.   | 
| maxBatchSizeBytes | 1 MiB |  Ukuran maksimum kumpulan pesan, dalam byte, yang coba dikirim klien ke Amazon SQS.  1 MiB adalah nilai maksimum yang diizinkan untuk Amazon SQS.   | 
| maxDoneReceiveBatches | 10 batch |  Jumlah maksimum batch penerima yang `AmazonSQSBufferedAsyncClient` mengambil dan menyimpan sisi klien. Semakin tinggi pengaturan, semakin banyak permintaan penerimaan yang dapat dipenuhi tanpa harus melakukan panggilan ke Amazon SQS (namun, semakin banyak pesan yang diambil sebelumnya, semakin lama mereka tetap berada di buffer, menyebabkan batas waktu visibilitas mereka sendiri kedaluwarsa).  `0`menunjukkan bahwa semua pesan pra-pengambilan dinonaktifkan dan pesan hanya dikonsumsi sesuai permintaan.   | 
| maxInflightOutboundBatches | 5 batch |  Jumlah maksimum batch keluar aktif yang dapat diproses secara bersamaan. Semakin tinggi pengaturan, batch keluar yang lebih cepat dapat dikirim (tunduk pada kuota seperti CPU atau bandwidth) dan semakin banyak thread yang dikonsumsi oleh. `AmazonSQSBufferedAsyncClient`  | 
| maxInflightReceiveBatches | 10 batch |  Jumlah maksimum batch penerima aktif yang dapat diproses pada saat yang bersamaan. Semakin tinggi pengaturan, semakin banyak pesan yang dapat diterima (tunduk pada kuota seperti CPU atau bandwidth), dan semakin banyak utas yang dikonsumsi`AmazonSQSBufferedAsyncClient`.  `0`menunjukkan bahwa semua pesan pra-pengambilan dinonaktifkan dan pesan hanya dikonsumsi sesuai permintaan.   | 
| visibilityTimeoutSeconds | -1 |  Ketika parameter ini disetel ke nilai positif, bukan nol, batas waktu visibilitas yang ditetapkan di sini akan mengganti batas waktu visibilitas yang ditetapkan pada antrian dari mana pesan dikonsumsi.  `-1`menunjukkan bahwa pengaturan default dipilih untuk antrian. Anda tidak dapat mengatur batas waktu visibilitas ke. `0`   | 

### AWS SDK for Java 2.x
<a name="using-buffered-async-client-java2"></a>

Untuk AWS SDK for Java 2.x, Anda dapat membuat yang `SqsAsyncBatchManager` baru berdasarkan contoh berikut:

```
// Create the basic Sqs Async Client
SqsAsyncClient sqs = SqsAsyncClient.builder() 
    .region(Region.US_EAST_1) 
    .build();

// Create the batch manager
SqsAsyncBatchManager sqsAsyncBatchManager = sqs.batchManager();
```

Setelah Anda membuat yang baru`SqsAsyncBatchManager`, Anda dapat menggunakannya untuk mengirim beberapa permintaan ke Amazon SQS (seperti yang Anda bisa dengan`SqsAsyncClient`), misalnya:

```
final String queueName = "MyAsyncBufferedQueue" + UUID.randomUUID();
final CreateQueueRequest request = CreateQueueRequest.builder().queueName(queueName).build();
final String queueUrl = sqs.createQueue(request).join().queueUrl();
System.out.println("Queue created: " + queueUrl);


// Send messages
CompletableFuture<SendMessageResponse> sendMessageFuture;
for (int i = 0; i < 10; i++) {
    final int index = i;
    sendMessageFuture = sqsAsyncBatchManager.sendMessage(
            r -> r.messageBody("Message " + index).queueUrl(queueUrl));
    SendMessageResponse response= sendMessageFuture.join();
    System.out.println("Message " + response.messageId() + " sent!");
}

// Receive messages with customized configurations
CompletableFuture<ReceiveMessageResponse> receiveResponseFuture = customizedBatchManager.receiveMessage(
        r -> r.queueUrl(queueUrl)
                .waitTimeSeconds(10)
                .visibilityTimeout(20)
                .maxNumberOfMessages(10)
);
System.out.println("You have received " + receiveResponseFuture.join().messages().size() + " messages in total.");

// Delete messages
DeleteQueueRequest deleteQueueRequest =  DeleteQueueRequest.builder().queueUrl(queueUrl).build();
int code = sqs.deleteQueue(deleteQueueRequest).join().sdkHttpResponse().statusCode();
System.out.println("Queue is deleted, with statusCode " + code);
```

### Mengkonfigurasi SqsAsyncBatchManager
<a name="configuring-SqsAsyncBatchManager"></a>

`SqsAsyncBatchManager`telah dikonfigurasi sebelumnya dengan pengaturan yang berfungsi untuk sebagian besar kasus penggunaan. Anda dapat mengkonfigurasi lebih lanjut`SqsAsyncBatchManager`, misalnya:

Membuat konfigurasi khusus melalui`SqsAsyncBatchManager.Builder`:

```
SqsAsyncBatchManager customizedBatchManager = SqsAsyncBatchManager.builder() 
    .client(sqs)
    .scheduledExecutor(Executors.newScheduledThreadPool(5))
    .overrideConfiguration(b -> b 
        .maxBatchSize(10)
        .sendRequestFrequency(Duration.ofMillis(200))
        .receiveMessageMinWaitDuration(Duration.ofSeconds(10))
        .receiveMessageVisibilityTimeout(Duration.ofSeconds(20)) 
        .receiveMessageAttributeNames(Collections.singletonList("*"))
        .receiveMessageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.ALL)))
    .build();
```


**Parameter `BatchOverrideConfiguration`**  

| Parameter | Nilai default | Deskripsi | 
| --- | --- | --- | 
| maxBatchSize |  10 permintaan per batch  | Jumlah maksimum pesan yang dikumpulkan bersama dalam satu permintaan. Semakin tinggi pengaturan, semakin sedikit batch yang diperlukan untuk melakukan jumlah permintaan yang sama.  Nilai maksimum yang diizinkan untuk Amazon SQS adalah 10 permintaan per batch.  | 
| sendRequestFrequency |  200 ms  | Jumlah waktu maksimum (dalam milidetik) panggilan keluar menunggu panggilan lain yang dengannya ia mengumpulkan pesan dari jenis yang sama. Semakin tinggi pengaturan, semakin sedikit batch yang diperlukan untuk melakukan jumlah pekerjaan yang sama (namun, panggilan pertama dalam batch harus menghabiskan waktu lebih lama menunggu). Saat Anda menyetel parameter ini`0`, permintaan yang dikirimkan tidak menunggu permintaan lain, yang secara efektif menonaktifkan batching. | 
| receiveMessageVisibilityTimeout |  -1  | Ketika parameter ini disetel ke nilai positif, bukan nol, batas waktu visibilitas yang ditetapkan di sini akan mengganti batas waktu visibilitas yang ditetapkan pada antrian dari mana pesan dikonsumsi.   `1`menunjukkan bahwa pengaturan default dipilih untuk antrian. Anda tidak dapat mengatur batas waktu visibilitas ke. `0`   | 
| receiveMessageMinWaitDuration |  50 ms  | Jumlah waktu minimal (dalam milidetik) `receiveMessage` panggilan menunggu pesan yang tersedia diambil. Semakin tinggi pengaturan, semakin sedikit batch yang diperlukan untuk melakukan jumlah permintaan yang sama.  | 

# Meningkatkan throughput menggunakan penskalaan horizontal dan batching aksi dengan Amazon SQS
<a name="sqs-throughput-horizontal-scaling-and-batching"></a>

Amazon SQS mendukung perpesanan throughput tinggi. Untuk detail tentang batas throughput, lihat. [Kuota pesan Amazon SQS](quotas-messages.md)

Untuk memaksimalkan throughput:
+ [Skalakan](#horizontal-scaling) produsen dan konsumen secara horizontal dengan menambahkan lebih banyak contoh masing-masing.
+ Gunakan [batch tindakan](#request-batching) untuk mengirim atau menerima beberapa pesan dalam satu permintaan, sehingga mengurangi overhead panggilan API.

## Penskalaan horizontal
<a name="horizontal-scaling"></a>

Karena Anda mengakses Amazon SQS melalui protokol permintaan-respons HTTP, *latensi permintaan* (interval antara memulai permintaan dan menerima respons) membatasi throughput yang dapat Anda capai dari satu utas menggunakan satu koneksi. Misalnya, jika latensi dari klien berbasis Amazon EC2 ke Amazon SQS di wilayah yang sama rata-rata 20 ms, throughput maksimum dari satu utas melalui satu koneksi rata-rata 50 TPS. 

*Penskalaan horizontal* melibatkan peningkatan jumlah produsen pesan (yang membuat `[SendMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html)` permintaan) dan konsumen (yang membuat `[ReceiveMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html)` dan `[DeleteMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_DeleteMessage.html)` meminta) untuk meningkatkan throughput antrian Anda secara keseluruhan. Anda dapat menskalakan secara horizontal dengan tiga cara:
+ Meningkatkan jumlah thread per klien
+ Tambahkan lebih banyak klien
+ Tingkatkan jumlah thread per klien dan tambahkan lebih banyak klien

Ketika Anda menambahkan lebih banyak klien, Anda mencapai keuntungan linier pada dasarnya dalam throughput antrian. Misalnya, jika Anda menggandakan jumlah klien, Anda juga menggandakan throughput. 

## Aksi batching
<a name="request-batching"></a>

*Batching* melakukan lebih banyak pekerjaan selama setiap perjalanan pulang pergi ke layanan (misalnya, ketika Anda mengirim beberapa pesan dengan satu `SendMessageBatch` permintaan). Tindakan batch Amazon SQS adalah`[SendMessageBatch](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessageBatch.html)`,`[DeleteMessageBatch](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_DeleteMessageBatch.html)`, dan. `[ChangeMessageVisibilityBatch](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ChangeMessageVisibilityBatch.html)` Untuk memanfaatkan batching tanpa mengubah produsen atau konsumen, Anda dapat menggunakan [Amazon SQS Buffered](sqs-client-side-buffering-request-batching.md) Asynchronous Client.

**catatan**  
Karena `[ReceiveMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html)` dapat memproses 10 pesan sekaligus, tidak ada `ReceiveMessageBatch` tindakan.

Batching mendistribusikan latensi tindakan batch melalui beberapa pesan dalam permintaan batch, daripada menerima seluruh latensi untuk satu pesan (misalnya, permintaan). `[SendMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html)` Karena setiap perjalanan pulang pergi membawa lebih banyak pekerjaan, permintaan batch membuat penggunaan thread dan koneksi lebih efisien, meningkatkan throughput.

Anda dapat menggabungkan batching dengan penskalaan horizontal untuk menyediakan throughput dengan thread, koneksi, dan permintaan yang lebih sedikit daripada permintaan pesan individual. Anda dapat menggunakan tindakan Amazon SQS batch untuk mengirim, menerima, atau menghapus hingga 10 pesan sekaligus. Karena Amazon SQS mengenakan biaya berdasarkan permintaan, batching dapat secara substansional mengurangi biaya Anda. 

Batching dapat menimbulkan beberapa kerumitan untuk aplikasi Anda (misalnya, aplikasi Anda harus mengumpulkan pesan sebelum mengirimnya, atau terkadang harus menunggu lebih lama untuk respons). Namun, batching masih bisa efektif dalam kasus-kasus berikut: 
+ Aplikasi Anda menghasilkan banyak pesan dalam waktu singkat, sehingga penundaan tidak pernah terlalu lama. 
+ Konsumen pesan mengambil pesan dari antrian atas kebijakannya sendiri, tidak seperti produsen pesan biasa yang perlu mengirim pesan sebagai respons terhadap peristiwa yang tidak mereka kendalikan. 

**penting**  
Permintaan batch mungkin berhasil meskipun pesan individual dalam batch gagal. Setelah permintaan batch, selalu periksa kegagalan pesan individual dan coba lagi tindakan jika perlu.

## Contoh Java yang berfungsi untuk permintaan operasi tunggal dan batch
<a name="working-java-example-batch-requests"></a>

### Prasyarat
<a name="batch-request-java-example-prerequisites"></a>

Tambahkan`aws-java-sdk-sqs.jar`,`aws-java-sdk-ec2.jar`, dan `commons-logging.jar` paket ke jalur kelas build Java Anda. Contoh berikut menampilkan dependensi ini dalam file `pom.xml` proyek Maven.

```
<dependencies>
    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-java-sdk-sqs</artifactId>
        <version>LATEST</version>
    </dependency>
    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-java-sdk-ec2</artifactId>
        <version>LATEST</version>
    </dependency>
    <dependency>
        <groupId>commons-logging</groupId>
        <artifactId>commons-logging</artifactId>
        <version>LATEST</version>
    </dependency>
</dependencies>
```

### SimpleProducerConsumer.jawa
<a name="batch-request-java-example-code"></a>

Contoh kode Java berikut mengimplementasikan pola produsen-konsumen sederhana. Thread utama memunculkan sejumlah thread produsen dan konsumen yang memproses pesan 1 KB untuk waktu tertentu. Contoh ini mencakup produsen dan konsumen yang membuat permintaan operasi tunggal dan mereka yang membuat permintaan batch.

```
/*
 * Copyright 2010-2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *  https://aws.amazon.com/apache2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 *
 */

import com.amazonaws.AmazonClientException;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Start a specified number of producer and consumer threads, and produce-consume
 * for the least of the specified duration and 1 hour. Some messages can be left
 * in the queue because producers and consumers might not be in exact balance.
 */
public class SimpleProducerConsumer {

    // The maximum runtime of the program.
    private final static int MAX_RUNTIME_MINUTES = 60;
    private final static Log log = LogFactory.getLog(SimpleProducerConsumer.class);

    public static void main(String[] args) throws InterruptedException {

        final Scanner input = new Scanner(System.in);

        System.out.print("Enter the queue name: ");
        final String queueName = input.nextLine();

        System.out.print("Enter the number of producers: ");
        final int producerCount = input.nextInt();

        System.out.print("Enter the number of consumers: ");
        final int consumerCount = input.nextInt();

        System.out.print("Enter the number of messages per batch: ");
        final int batchSize = input.nextInt();

        System.out.print("Enter the message size in bytes: ");
        final int messageSizeByte = input.nextInt();

        System.out.print("Enter the run time in minutes: ");
        final int runTimeMinutes = input.nextInt();

        /*
         * Create a new instance of the builder with all defaults (credentials
         * and region) set automatically. For more information, see Creating
         * Service Clients in the AWS SDK for Java Developer Guide.
         */
        final ClientConfiguration clientConfiguration = new ClientConfiguration()
                .withMaxConnections(producerCount + consumerCount);

        final AmazonSQS sqsClient = AmazonSQSClientBuilder.standard()
                .withClientConfiguration(clientConfiguration)
                .build();

        final String queueUrl = sqsClient
                .getQueueUrl(new GetQueueUrlRequest(queueName)).getQueueUrl();

        // The flag used to stop producer, consumer, and monitor threads.
        final AtomicBoolean stop = new AtomicBoolean(false);

        // Start the producers.
        final AtomicInteger producedCount = new AtomicInteger();
        final Thread[] producers = new Thread[producerCount];
        for (int i = 0; i < producerCount; i++) {
            if (batchSize == 1) {
                producers[i] = new Producer(sqsClient, queueUrl, messageSizeByte,
                        producedCount, stop);
            } else {
                producers[i] = new BatchProducer(sqsClient, queueUrl, batchSize,
                        messageSizeByte, producedCount,
                        stop);
            }
            producers[i].start();
        }

        // Start the consumers.
        final AtomicInteger consumedCount = new AtomicInteger();
        final Thread[] consumers = new Thread[consumerCount];
        for (int i = 0; i < consumerCount; i++) {
            if (batchSize == 1) {
                consumers[i] = new Consumer(sqsClient, queueUrl, consumedCount,
                        stop);
            } else {
                consumers[i] = new BatchConsumer(sqsClient, queueUrl, batchSize,
                        consumedCount, stop);
            }
            consumers[i].start();
        }

        // Start the monitor thread.
        final Thread monitor = new Monitor(producedCount, consumedCount, stop);
        monitor.start();

        // Wait for the specified amount of time then stop.
        Thread.sleep(TimeUnit.MINUTES.toMillis(Math.min(runTimeMinutes,
                MAX_RUNTIME_MINUTES)));
        stop.set(true);

        // Join all threads.
        for (int i = 0; i < producerCount; i++) {
            producers[i].join();
        }

        for (int i = 0; i < consumerCount; i++) {
            consumers[i].join();
        }

        monitor.interrupt();
        monitor.join();
    }

    private static String makeRandomString(int sizeByte) {
        final byte[] bs = new byte[(int) Math.ceil(sizeByte * 5 / 8)];
        new Random().nextBytes(bs);
        bs[0] = (byte) ((bs[0] | 64) & 127);
        return new BigInteger(bs).toString(32);
    }

    /**
     * The producer thread uses {@code SendMessage}
     * to send messages until it is stopped.
     */
    private static class Producer extends Thread {
        final AmazonSQS sqsClient;
        final String queueUrl;
        final AtomicInteger producedCount;
        final AtomicBoolean stop;
        final String theMessage;

        Producer(AmazonSQS sqsQueueBuffer, String queueUrl, int messageSizeByte,
                 AtomicInteger producedCount, AtomicBoolean stop) {
            this.sqsClient = sqsQueueBuffer;
            this.queueUrl = queueUrl;
            this.producedCount = producedCount;
            this.stop = stop;
            this.theMessage = makeRandomString(messageSizeByte);
        }

        /*
         * The producedCount object tracks the number of messages produced by
         * all producer threads. If there is an error, the program exits the
         * run() method.
         */
        public void run() {
            try {
                while (!stop.get()) {
                    sqsClient.sendMessage(new SendMessageRequest(queueUrl,
                            theMessage));
                    producedCount.incrementAndGet();
                }
            } catch (AmazonClientException e) {
                /*
                 * By default, AmazonSQSClient retries calls 3 times before
                 * failing. If this unlikely condition occurs, stop.
                 */
                log.error("Producer: " + e.getMessage());
                System.exit(1);
            }
        }
    }

    /**
     * The producer thread uses {@code SendMessageBatch}
     * to send messages until it is stopped.
     */
    private static class BatchProducer extends Thread {
        final AmazonSQS sqsClient;
        final String queueUrl;
        final int batchSize;
        final AtomicInteger producedCount;
        final AtomicBoolean stop;
        final String theMessage;

        BatchProducer(AmazonSQS sqsQueueBuffer, String queueUrl, int batchSize,
                      int messageSizeByte, AtomicInteger producedCount,
                      AtomicBoolean stop) {
            this.sqsClient = sqsQueueBuffer;
            this.queueUrl = queueUrl;
            this.batchSize = batchSize;
            this.producedCount = producedCount;
            this.stop = stop;
            this.theMessage = makeRandomString(messageSizeByte);
        }

        public void run() {
            try {
                while (!stop.get()) {
                    final SendMessageBatchRequest batchRequest =
                            new SendMessageBatchRequest().withQueueUrl(queueUrl);

                    final List<SendMessageBatchRequestEntry> entries =
                            new ArrayList<SendMessageBatchRequestEntry>();
                    for (int i = 0; i < batchSize; i++)
                        entries.add(new SendMessageBatchRequestEntry()
                                .withId(Integer.toString(i))
                                .withMessageBody(theMessage));
                    batchRequest.setEntries(entries);

                    final SendMessageBatchResult batchResult =
                            sqsClient.sendMessageBatch(batchRequest);
                    producedCount.addAndGet(batchResult.getSuccessful().size());

                    /*
                     * Because SendMessageBatch can return successfully, but
                     * individual batch items fail, retry the failed batch items.
                     */
                    if (!batchResult.getFailed().isEmpty()) {
                        log.warn("Producer: retrying sending "
                                + batchResult.getFailed().size() + " messages");
                        for (int i = 0, n = batchResult.getFailed().size();
                             i < n; i++) {
                            sqsClient.sendMessage(new
                                    SendMessageRequest(queueUrl, theMessage));
                            producedCount.incrementAndGet();
                        }
                    }
                }
            } catch (AmazonClientException e) {
                /*
                 * By default, AmazonSQSClient retries calls 3 times before
                 * failing. If this unlikely condition occurs, stop.
                 */
                log.error("BatchProducer: " + e.getMessage());
                System.exit(1);
            }
        }
    }

    /**
     * The consumer thread uses {@code ReceiveMessage} and {@code DeleteMessage}
     * to consume messages until it is stopped.
     */
    private static class Consumer extends Thread {
        final AmazonSQS sqsClient;
        final String queueUrl;
        final AtomicInteger consumedCount;
        final AtomicBoolean stop;

        Consumer(AmazonSQS sqsClient, String queueUrl, AtomicInteger consumedCount,
                 AtomicBoolean stop) {
            this.sqsClient = sqsClient;
            this.queueUrl = queueUrl;
            this.consumedCount = consumedCount;
            this.stop = stop;
        }

        /*
         * Each consumer thread receives and deletes messages until the main
         * thread stops the consumer thread. The consumedCount object tracks the
         * number of messages that are consumed by all consumer threads, and the
         * count is logged periodically.
         */
        public void run() {
            try {
                while (!stop.get()) {
                    try {
                        final ReceiveMessageResult result = sqsClient
                                .receiveMessage(new
                                        ReceiveMessageRequest(queueUrl));

                        if (!result.getMessages().isEmpty()) {
                            final Message m = result.getMessages().get(0);
                            sqsClient.deleteMessage(new
                                    DeleteMessageRequest(queueUrl,
                                    m.getReceiptHandle()));
                            consumedCount.incrementAndGet();
                        }
                    } catch (AmazonClientException e) {
                        log.error(e.getMessage());
                    }
                }
            } catch (AmazonClientException e) {
                /*
                 * By default, AmazonSQSClient retries calls 3 times before
                 * failing. If this unlikely condition occurs, stop.
                 */
                log.error("Consumer: " + e.getMessage());
                System.exit(1);
            }
        }
    }

    /**
     * The consumer thread uses {@code ReceiveMessage} and {@code
     * DeleteMessageBatch} to consume messages until it is stopped.
     */
    private static class BatchConsumer extends Thread {
        final AmazonSQS sqsClient;
        final String queueUrl;
        final int batchSize;
        final AtomicInteger consumedCount;
        final AtomicBoolean stop;

        BatchConsumer(AmazonSQS sqsClient, String queueUrl, int batchSize,
                      AtomicInteger consumedCount, AtomicBoolean stop) {
            this.sqsClient = sqsClient;
            this.queueUrl = queueUrl;
            this.batchSize = batchSize;
            this.consumedCount = consumedCount;
            this.stop = stop;
        }

        public void run() {
            try {
                while (!stop.get()) {
                    final ReceiveMessageResult result = sqsClient
                            .receiveMessage(new ReceiveMessageRequest(queueUrl)
                                    .withMaxNumberOfMessages(batchSize));

                    if (!result.getMessages().isEmpty()) {
                        final List<Message> messages = result.getMessages();
                        final DeleteMessageBatchRequest batchRequest =
                                new DeleteMessageBatchRequest()
                                        .withQueueUrl(queueUrl);

                        final List<DeleteMessageBatchRequestEntry> entries =
                                new ArrayList<DeleteMessageBatchRequestEntry>();
                        for (int i = 0, n = messages.size(); i < n; i++)
                            entries.add(new DeleteMessageBatchRequestEntry()
                                    .withId(Integer.toString(i))
                                    .withReceiptHandle(messages.get(i)
                                            .getReceiptHandle()));
                        batchRequest.setEntries(entries);

                        final DeleteMessageBatchResult batchResult = sqsClient
                                .deleteMessageBatch(batchRequest);
                        consumedCount.addAndGet(batchResult.getSuccessful().size());

                        /*
                         * Because DeleteMessageBatch can return successfully,
                         * but individual batch items fail, retry the failed
                         * batch items.
                         */
                        if (!batchResult.getFailed().isEmpty()) {
                            final int n = batchResult.getFailed().size();
                            log.warn("Producer: retrying deleting " + n
                                    + " messages");
                            for (BatchResultErrorEntry e : batchResult
                                    .getFailed()) {

                                sqsClient.deleteMessage(
                                        new DeleteMessageRequest(queueUrl,
                                                messages.get(Integer
                                                        .parseInt(e.getId()))
                                                        .getReceiptHandle()));

                                consumedCount.incrementAndGet();
                            }
                        }
                    }
                }
            } catch (AmazonClientException e) {
                /*
                 * By default, AmazonSQSClient retries calls 3 times before
                 * failing. If this unlikely condition occurs, stop.
                 */
                log.error("BatchConsumer: " + e.getMessage());
                System.exit(1);
            }
        }
    }

    /**
     * This thread prints every second the number of messages produced and
     * consumed so far.
     */
    private static class Monitor extends Thread {
        private final AtomicInteger producedCount;
        private final AtomicInteger consumedCount;
        private final AtomicBoolean stop;

        Monitor(AtomicInteger producedCount, AtomicInteger consumedCount,
                AtomicBoolean stop) {
            this.producedCount = producedCount;
            this.consumedCount = consumedCount;
            this.stop = stop;
        }

        public void run() {
            try {
                while (!stop.get()) {
                    Thread.sleep(1000);
                    log.info("produced messages = " + producedCount.get()
                            + ", consumed messages = " + consumedCount.get());
                }
            } catch (InterruptedException e) {
                // Allow the thread to exit.
            }
        }
    }
}
```

### Memantau metrik volume dari contoh yang dijalankan
<a name="batch-request-java-example-monitoring-metrics"></a>

Amazon SQS secara otomatis menghasilkan metrik volume untuk pesan yang dikirim, diterima, dan dihapus. [Anda dapat mengakses metrik tersebut dan lainnya melalui tab **Monitoring** untuk antrian Anda atau di CloudWatch konsol.](https://console.aws.amazon.com/cloudwatch/home)

**catatan**  
Metrik dapat memakan waktu hingga 15 menit setelah antrian mulai tersedia.