Aumento da produtividade usando escalabilidade horizontal e agrupamento de ações com a Amazon SQS - Amazon Simple Queue Service

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Aumento da produtividade usando escalabilidade horizontal e agrupamento de ações com a Amazon SQS

SQSAs filas da Amazon podem oferecer uma taxa de transferência muito alta. Para obter mais informações sobre cotas de taxa de transferência, consulte Cotas de mensagens do Amazon SQS.

Para atingir uma taxa de transferência alta, você deve dimensionar os produtores de mensagens e os consumidores horizontalmente (adicionar mais produtores e consumidores).

Escalabilidade horizontal

Como você acessa a Amazon SQS por meio de um protocolo de HTTP solicitação-resposta, a latência da solicitação (o intervalo entre iniciar uma solicitação e receber uma resposta) limita a taxa de transferência que você pode obter de um único thread usando uma única conexão. Por exemplo, se a latência de um cliente EC2 baseado na Amazon para a Amazon SQS na mesma região for em média de 20 ms, a taxa de transferência máxima de um único thread em uma única conexão será em média de 50. TPS

A escalabilidade horizontal envolve o aumento do número de produtores de mensagem (que fazem a solicitação SendMessage) e dos consumidores (que fazem solicitações ReceiveMessage e DeleteMessage) para aumentar sua taxa de transferência de fila geral. Você pode escalar horizontalmente de três formas:

  • Aumentar o número de threads por cliente

  • Adicionar mais clientes

  • Aumentar o número de threads por cliente e adicionar mais clientes

Ao adicionar mais clientes, você obtém ganhos essencialmente lineares na taxa de transferência da fila. Por exemplo, se você dobrar o número de clientes, terá duas vezes a taxa de transferência.

nota

Ao escalar horizontalmente, certifique-se de que seu SQS cliente da Amazon tenha conexões ou threads suficientes para suportar o número de produtores de mensagens e consumidores simultâneos que enviam solicitações e recebem respostas. Por exemplo, por padrão, as instâncias da AWS SDK for Java AmazonSQSClient classe mantêm no máximo 50 conexões com a AmazonSQS. Para criar produtores e consumidores simultâneos adicionais, você precisa ajustar o número máximo de threads de produtores e consumidores permitidos em um objeto AmazonSQSClientBuilder, por exemplo:

final AmazonSQS sqsClient = AmazonSQSClientBuilder.standard() .withClientConfiguration(new ClientConfiguration() .withMaxConnections(producerCount + consumerCount)) .build();

Para AmazonSQSAsyncClient, você também precisa ter certeza de que há threads suficientes disponíveis.

Esse exemplo só funciona para Java v. 1.x.

Processamento de ações em lotes

O processamento em lotes executa mais trabalho durante a ida e a volta do serviço (por exemplo, quando você envia várias mensagens com uma única solicitação SendMessageBatch). As ações SQS em lote da Amazon são SendMessageBatchDeleteMessageBatch, ChangeMessageVisibilityBatch e. Para aproveitar os lotes sem alterar seus produtores ou consumidores, você pode usar o Amazon SQS Buffered Asynchronous Client.

nota

Como ReceiveMessage pode processar 10 mensagens por vez, não há nenhuma ação ReceiveMessageBatch.

O processamento em lotes distribui a latência da ação de lote nas várias mensagens de uma solicitação em lote em vez de aceitar toda a latência para uma única mensagem (por exemplo, uma solicitação SendMessage). Como cada ida e volta carrega mais trabalho, as solicitações de lote tornam mais eficiente o uso de threads e conexões, melhorando, dessa forma, a taxa de transferência.

Você pode combinar processamentos em lote com escalabilidade horizontal para fornecer taxa de transferência com menos threads, conexões e solicitações em comparação com as solicitações de mensagens individuais. Você pode usar SQS ações em lotes da Amazon para enviar, receber ou excluir até 10 mensagens por vez. Como a Amazon SQS cobra de acordo com a solicitação, o envio em lotes pode reduzir substancialmente seus custos.

O processamento em lotes pode criar certa complexidade para o seu aplicativo (por exemplo, o aplicativo precisa acumular as mensagens antes de enviá-las e, às vezes, precisará esperar mais por uma resposta). No entanto, o processamento em lotes pode ser eficaz nos seguintes casos:

  • Seu aplicativo gera muitas mensagens em um curto intervalo de tempo, portanto, o atraso nunca é muito longo.

  • Um consumidor de mensagem busca as mensagens de uma fila a seu critério, ao contrário de produtores de mensagem típicos que precisam enviar mensagens em resposta a eventos que eles não controlam.

Importante

Uma solicitação de lote pode ser bem-sucedida, mesmo que ocorra falha nas mensagens individuais no lote. Após uma solicitação de lote, você sempre deve verificar a existência de falhas em mensagens individuais e repetir a ação, se necessário.

Exemplo de Java funcional para operações únicas e solicitações em lote

Pré-requisitos

Adicione os pacotes aws-java-sdk-sqs.jar, aws-java-sdk-ec2.jar e commons-logging.jar ao caminho da classe de compilação do Java. O exemplo a seguir mostra essas dependências em um arquivo pom.xml do projeto Maven.

<dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-sqs</artifactId> <version>LATEST</version> </dependency> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-ec2</artifactId> <version>LATEST</version> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>LATEST</version> </dependency> </dependencies>

SimpleProducerConsumer.java

O exemplo de código Java a seguir implementa um padrão simples de produtor-consumidor. O thread principal gera um número de threads de produtor e consumidor que processam mensagens de 1 KB em um determinado momento. Ele inclui produtores e os consumidores que fazem solicitações de operação únicas e outros que fazem solicitações de lote.

/* * Copyright 2010-2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * https://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. * */ import com.amazonaws.AmazonClientException; import com.amazonaws.ClientConfiguration; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.model.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.math.BigInteger; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.Scanner; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** * Start a specified number of producer and consumer threads, and produce-consume * for the least of the specified duration and 1 hour. Some messages can be left * in the queue because producers and consumers might not be in exact balance. */ public class SimpleProducerConsumer { // The maximum runtime of the program. private final static int MAX_RUNTIME_MINUTES = 60; private final static Log log = LogFactory.getLog(SimpleProducerConsumer.class); public static void main(String[] args) throws InterruptedException { final Scanner input = new Scanner(System.in); System.out.print("Enter the queue name: "); final String queueName = input.nextLine(); System.out.print("Enter the number of producers: "); final int producerCount = input.nextInt(); System.out.print("Enter the number of consumers: "); final int consumerCount = input.nextInt(); System.out.print("Enter the number of messages per batch: "); final int batchSize = input.nextInt(); System.out.print("Enter the message size in bytes: "); final int messageSizeByte = input.nextInt(); System.out.print("Enter the run time in minutes: "); final int runTimeMinutes = input.nextInt(); /* * Create a new instance of the builder with all defaults (credentials * and region) set automatically. For more information, see Creating * Service Clients in the AWS SDK for Java Developer Guide. */ final ClientConfiguration clientConfiguration = new ClientConfiguration() .withMaxConnections(producerCount + consumerCount); final AmazonSQS sqsClient = AmazonSQSClientBuilder.standard() .withClientConfiguration(clientConfiguration) .build(); final String queueUrl = sqsClient .getQueueUrl(new GetQueueUrlRequest(queueName)).getQueueUrl(); // The flag used to stop producer, consumer, and monitor threads. final AtomicBoolean stop = new AtomicBoolean(false); // Start the producers. final AtomicInteger producedCount = new AtomicInteger(); final Thread[] producers = new Thread[producerCount]; for (int i = 0; i < producerCount; i++) { if (batchSize == 1) { producers[i] = new Producer(sqsClient, queueUrl, messageSizeByte, producedCount, stop); } else { producers[i] = new BatchProducer(sqsClient, queueUrl, batchSize, messageSizeByte, producedCount, stop); } producers[i].start(); } // Start the consumers. final AtomicInteger consumedCount = new AtomicInteger(); final Thread[] consumers = new Thread[consumerCount]; for (int i = 0; i < consumerCount; i++) { if (batchSize == 1) { consumers[i] = new Consumer(sqsClient, queueUrl, consumedCount, stop); } else { consumers[i] = new BatchConsumer(sqsClient, queueUrl, batchSize, consumedCount, stop); } consumers[i].start(); } // Start the monitor thread. final Thread monitor = new Monitor(producedCount, consumedCount, stop); monitor.start(); // Wait for the specified amount of time then stop. Thread.sleep(TimeUnit.MINUTES.toMillis(Math.min(runTimeMinutes, MAX_RUNTIME_MINUTES))); stop.set(true); // Join all threads. for (int i = 0; i < producerCount; i++) { producers[i].join(); } for (int i = 0; i < consumerCount; i++) { consumers[i].join(); } monitor.interrupt(); monitor.join(); } private static String makeRandomString(int sizeByte) { final byte[] bs = new byte[(int) Math.ceil(sizeByte * 5 / 8)]; new Random().nextBytes(bs); bs[0] = (byte) ((bs[0] | 64) & 127); return new BigInteger(bs).toString(32); } /** * The producer thread uses {@code SendMessage} * to send messages until it is stopped. */ private static class Producer extends Thread { final AmazonSQS sqsClient; final String queueUrl; final AtomicInteger producedCount; final AtomicBoolean stop; final String theMessage; Producer(AmazonSQS sqsQueueBuffer, String queueUrl, int messageSizeByte, AtomicInteger producedCount, AtomicBoolean stop) { this.sqsClient = sqsQueueBuffer; this.queueUrl = queueUrl; this.producedCount = producedCount; this.stop = stop; this.theMessage = makeRandomString(messageSizeByte); } /* * The producedCount object tracks the number of messages produced by * all producer threads. If there is an error, the program exits the * run() method. */ public void run() { try { while (!stop.get()) { sqsClient.sendMessage(new SendMessageRequest(queueUrl, theMessage)); producedCount.incrementAndGet(); } } catch (AmazonClientException e) { /* * By default, AmazonSQSClient retries calls 3 times before * failing. If this unlikely condition occurs, stop. */ log.error("Producer: " + e.getMessage()); System.exit(1); } } } /** * The producer thread uses {@code SendMessageBatch} * to send messages until it is stopped. */ private static class BatchProducer extends Thread { final AmazonSQS sqsClient; final String queueUrl; final int batchSize; final AtomicInteger producedCount; final AtomicBoolean stop; final String theMessage; BatchProducer(AmazonSQS sqsQueueBuffer, String queueUrl, int batchSize, int messageSizeByte, AtomicInteger producedCount, AtomicBoolean stop) { this.sqsClient = sqsQueueBuffer; this.queueUrl = queueUrl; this.batchSize = batchSize; this.producedCount = producedCount; this.stop = stop; this.theMessage = makeRandomString(messageSizeByte); } public void run() { try { while (!stop.get()) { final SendMessageBatchRequest batchRequest = new SendMessageBatchRequest().withQueueUrl(queueUrl); final List<SendMessageBatchRequestEntry> entries = new ArrayList<SendMessageBatchRequestEntry>(); for (int i = 0; i < batchSize; i++) entries.add(new SendMessageBatchRequestEntry() .withId(Integer.toString(i)) .withMessageBody(theMessage)); batchRequest.setEntries(entries); final SendMessageBatchResult batchResult = sqsClient.sendMessageBatch(batchRequest); producedCount.addAndGet(batchResult.getSuccessful().size()); /* * Because SendMessageBatch can return successfully, but * individual batch items fail, retry the failed batch items. */ if (!batchResult.getFailed().isEmpty()) { log.warn("Producer: retrying sending " + batchResult.getFailed().size() + " messages"); for (int i = 0, n = batchResult.getFailed().size(); i < n; i++) { sqsClient.sendMessage(new SendMessageRequest(queueUrl, theMessage)); producedCount.incrementAndGet(); } } } } catch (AmazonClientException e) { /* * By default, AmazonSQSClient retries calls 3 times before * failing. If this unlikely condition occurs, stop. */ log.error("BatchProducer: " + e.getMessage()); System.exit(1); } } } /** * The consumer thread uses {@code ReceiveMessage} and {@code DeleteMessage} * to consume messages until it is stopped. */ private static class Consumer extends Thread { final AmazonSQS sqsClient; final String queueUrl; final AtomicInteger consumedCount; final AtomicBoolean stop; Consumer(AmazonSQS sqsClient, String queueUrl, AtomicInteger consumedCount, AtomicBoolean stop) { this.sqsClient = sqsClient; this.queueUrl = queueUrl; this.consumedCount = consumedCount; this.stop = stop; } /* * Each consumer thread receives and deletes messages until the main * thread stops the consumer thread. The consumedCount object tracks the * number of messages that are consumed by all consumer threads, and the * count is logged periodically. */ public void run() { try { while (!stop.get()) { try { final ReceiveMessageResult result = sqsClient .receiveMessage(new ReceiveMessageRequest(queueUrl)); if (!result.getMessages().isEmpty()) { final Message m = result.getMessages().get(0); sqsClient.deleteMessage(new DeleteMessageRequest(queueUrl, m.getReceiptHandle())); consumedCount.incrementAndGet(); } } catch (AmazonClientException e) { log.error(e.getMessage()); } } } catch (AmazonClientException e) { /* * By default, AmazonSQSClient retries calls 3 times before * failing. If this unlikely condition occurs, stop. */ log.error("Consumer: " + e.getMessage()); System.exit(1); } } } /** * The consumer thread uses {@code ReceiveMessage} and {@code * DeleteMessageBatch} to consume messages until it is stopped. */ private static class BatchConsumer extends Thread { final AmazonSQS sqsClient; final String queueUrl; final int batchSize; final AtomicInteger consumedCount; final AtomicBoolean stop; BatchConsumer(AmazonSQS sqsClient, String queueUrl, int batchSize, AtomicInteger consumedCount, AtomicBoolean stop) { this.sqsClient = sqsClient; this.queueUrl = queueUrl; this.batchSize = batchSize; this.consumedCount = consumedCount; this.stop = stop; } public void run() { try { while (!stop.get()) { final ReceiveMessageResult result = sqsClient .receiveMessage(new ReceiveMessageRequest(queueUrl) .withMaxNumberOfMessages(batchSize)); if (!result.getMessages().isEmpty()) { final List<Message> messages = result.getMessages(); final DeleteMessageBatchRequest batchRequest = new DeleteMessageBatchRequest() .withQueueUrl(queueUrl); final List<DeleteMessageBatchRequestEntry> entries = new ArrayList<DeleteMessageBatchRequestEntry>(); for (int i = 0, n = messages.size(); i < n; i++) entries.add(new DeleteMessageBatchRequestEntry() .withId(Integer.toString(i)) .withReceiptHandle(messages.get(i) .getReceiptHandle())); batchRequest.setEntries(entries); final DeleteMessageBatchResult batchResult = sqsClient .deleteMessageBatch(batchRequest); consumedCount.addAndGet(batchResult.getSuccessful().size()); /* * Because DeleteMessageBatch can return successfully, * but individual batch items fail, retry the failed * batch items. */ if (!batchResult.getFailed().isEmpty()) { final int n = batchResult.getFailed().size(); log.warn("Producer: retrying deleting " + n + " messages"); for (BatchResultErrorEntry e : batchResult .getFailed()) { sqsClient.deleteMessage( new DeleteMessageRequest(queueUrl, messages.get(Integer .parseInt(e.getId())) .getReceiptHandle())); consumedCount.incrementAndGet(); } } } } } catch (AmazonClientException e) { /* * By default, AmazonSQSClient retries calls 3 times before * failing. If this unlikely condition occurs, stop. */ log.error("BatchConsumer: " + e.getMessage()); System.exit(1); } } } /** * This thread prints every second the number of messages produced and * consumed so far. */ private static class Monitor extends Thread { private final AtomicInteger producedCount; private final AtomicInteger consumedCount; private final AtomicBoolean stop; Monitor(AtomicInteger producedCount, AtomicInteger consumedCount, AtomicBoolean stop) { this.producedCount = producedCount; this.consumedCount = consumedCount; this.stop = stop; } public void run() { try { while (!stop.get()) { Thread.sleep(1000); log.info("produced messages = " + producedCount.get() + ", consumed messages = " + consumedCount.get()); } } catch (InterruptedException e) { // Allow the thread to exit. } } } }

Monitorar métricas de volume da execução de exemplo

A Amazon gera SQS automaticamente métricas de volume para mensagens enviadas, recebidas e excluídas. Você pode acessar essas métricas e outras por meio da guia Monitoring (Monitoramento) de sua fila ou no console do CloudWatch .

nota

As métricas podem levar até 15 minutos após a fila começar para ficar disponíveis.