Steigerung des Durchsatzes durch horizontale Skalierung und Action-Batching mit Amazon SQS - Amazon Simple Queue Service

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Steigerung des Durchsatzes durch horizontale Skalierung und Action-Batching mit Amazon SQS

SQSAmazon-Warteschlangen können einen sehr hohen Durchsatz liefern. Weitere Informationen zu Durchsatzkontingenten finden Sie unter SQSAmazon-Nachrichtenkontingente.

Um einen hohen Durchsatz zu erreichen, müssen Sie Nachrichtenproduzenten und -konsumenten horizontal skalieren (durch Hinzufügen weiterer Produzenten und Konsumenten).

Horizontale Skalierung

Da Sie SQS über ein HTTP Request-Response-Protokoll auf Amazon zugreifen, begrenzt die Anforderungslatenz (das Intervall zwischen dem Initiieren einer Anfrage und dem Empfangen einer Antwort) den Durchsatz, den Sie mit einem einzelnen Thread mit einer einzigen Verbindung erreichen können. Wenn beispielsweise die Latenz von einem EC2 Amazon-Client zu Amazon SQS in derselben Region durchschnittlich 20 ms beträgt, beträgt der maximale Durchsatz von einem einzelnen Thread über eine einzelne Verbindung durchschnittlich TPS 50.

Horizontale Skalierung bedeutet, dass die Anzahl Ihrer Nachrichtenproduzenten (die SendMessage-Anforderungen erstellen) und Konsumenten (die ReceiveMessage- und DeleteMessage-Anforderungen erstellen) erhöht wird, um den Gesamtdurchsatz der Warteschlange zu steigern. Sie können auf drei Arten horizontal skalieren:

  • Erhöhen der Anzahl der Threads pro Client

  • Hinzufügen weiterer Clients

  • Erhöhen der Anzahl der Threads pro Client und Hinzufügen weiterer Clients

Wenn Sie weitere Clients hinzufügen, erzielen Sie eine wesentliche lineare Steigerung des Durchsatzes der Warteschlange. Wenn Sie die Anzahl der Clients beispielsweise verdoppeln, verdoppeln Sie auch den Durchsatz.

Anmerkung

Stellen Sie bei der horizontalen Skalierung sicher, dass Ihr SQS Amazon-Client über genügend Verbindungen oder Threads verfügt, um die Anzahl der gleichzeitigen Nachrichtenproduzenten und -verbraucher zu unterstützen, die Anfragen senden und Antworten erhalten. Beispielsweise unterhalten Instances der AWS SDK for Java AmazonSQSClient Klasse standardmäßig höchstens 50 Verbindungen zu AmazonSQS. Um zusätzliche gleichzeitige Produzenten und Konsumenten zu erstellen, müssen Sie die maximal zulässige Anzahl von Produzenten- und Konsumenten-Threads für ein AmazonSQSClientBuilder-Objekt anpassen, z. B.:

final AmazonSQS sqsClient = AmazonSQSClientBuilder.standard() .withClientConfiguration(new ClientConfiguration() .withMaxConnections(producerCount + consumerCount)) .build();

Für AmazonSQSAsyncClient müssen Sie außerdem sicherstellen, dass ausreichend Threads verfügbar sind.

Dieses Beispiel funktioniert nur für Java v. 1.x.

Stapelverarbeitung von Aktionen

Mit der Stapelverarbeitung wird in den einzelnen Roundtrips an den Service mehr Arbeit ausgeführt (z. B. das Senden mehrerer Nachrichten mit einer einzelnen SendMessageBatch-Anforderung). Die SQS Amazon-Batch-Aktionen sindSendMessageBatch,DeleteMessageBatch, undChangeMessageVisibilityBatch. Um die Vorteile des Batchings zu nutzen, ohne Ihre Produzenten oder Verbraucher zu wechseln, können Sie den Amazon SQS Buffered Asynchronous Client verwenden.

Anmerkung

Da mit ReceiveMessage 10 Nachrichten gleichzeitig verarbeitet werden können, wird keine ReceiveMessageBatch-Aktion ausgeführt.

Die Stapelverarbeitung verteilt die Latenz der Stapelaktion über mehrere Nachrichten in einer Stapelanforderung, anstatt die gesamte Latenz für eine einzelne Nachricht (z. B. eine SendMessage-Anforderung) zu akzeptieren. Da jeder Roundtrip mehr Arbeit verrichtet, nutzen Stapelanforderungen Threads und Verbindungen effektiver und verbessern somit den Durchsatz.

Sie können die Stapelverarbeitung mit der horizontalen Skalierung kombinieren, um einen Durchsatz mit weniger Threads, Verbindungen und Anforderungen zu bieten, als dies bei einzelnen Nachrichtenanforderungen der Fall ist. Sie können gestapelte SQS Amazon-Aktionen verwenden, um bis zu 10 Nachrichten gleichzeitig zu senden, zu empfangen oder zu löschen. Da Amazon die SQS Gebühren auf Anfrage berechnet, kann die Stapelverarbeitung Ihre Kosten erheblich senken.

Mit der Stapelverarbeitung kann eine gewisse Komplexität für Ihre Anwendung einhergehen (z. B. muss die Anwendung Nachrichten vor der Übermittlung sammeln oder gelegentlich länger auf eine Antwort warten). Die Stapelverarbeitung kann in folgenden Fällen jedoch weiterhin effektiv sein:

  • Ihre Anwendung erstellt in kurzer Zeit viele Nachrichten, sodass es niemals zu einer sehr langen Verzögerung kommt.

  • Anders als typische Nachrichtenproduzenten, die Nachrichten als Antwort auf Ereignisse senden müssen, die sie nicht steuern können, ruft ein Nachrichtenkonsument Nachrichten nach eigenem Ermessen aus einer Warteschlange ab.

Wichtig

Eine Stapelanforderung kann erfolgreich ausgeführt werden, auch wenn einzelne Nachrichten im Stapel fehlgeschlagen sind. Überprüfen Sie nach einer Stapelanforderung stets, ob einzelne Nachrichten nicht zugestellt werden konnten, und führen Sie diese bei Bedarf erneut aus.

Funktionierendes Java-Beispiel für einzelne Operationsanforderungen und Stapelanforderungen

Voraussetzungen

Fügen Sie dem Pfad für Ihre Java-Build-Klasse die Pakete aws-java-sdk-sqs.jar, aws-java-sdk-ec2.jar und commons-logging.jar hinzu. Das folgende Beispiel zeigt diese Abhängigkeiten in der pom.xml-Datei eines Maven-Projekts.

<dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-sqs</artifactId> <version>LATEST</version> </dependency> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-ec2</artifactId> <version>LATEST</version> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>LATEST</version> </dependency> </dependencies>

SimpleProducerConsumer.java

Das nachstehende Java-Code-Beispiel veranschaulicht ein einfaches Produzent-Konsument-Muster. Der Haupt-Thread ruft eine Reihe von Produzenten- und Konsumenten-Threads auf, die 1-KB-Nachrichten für eine bestimmte Zeit verarbeiten. Dieses Beispiel enthält Produzenten und Konsumenten, die einzelne Operationsanforderungen senden, und solche, die Stapelanforderungen senden.

/* * Copyright 2010-2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * https://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. * */ import com.amazonaws.AmazonClientException; import com.amazonaws.ClientConfiguration; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.model.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.math.BigInteger; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.Scanner; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** * Start a specified number of producer and consumer threads, and produce-consume * for the least of the specified duration and 1 hour. Some messages can be left * in the queue because producers and consumers might not be in exact balance. */ public class SimpleProducerConsumer { // The maximum runtime of the program. private final static int MAX_RUNTIME_MINUTES = 60; private final static Log log = LogFactory.getLog(SimpleProducerConsumer.class); public static void main(String[] args) throws InterruptedException { final Scanner input = new Scanner(System.in); System.out.print("Enter the queue name: "); final String queueName = input.nextLine(); System.out.print("Enter the number of producers: "); final int producerCount = input.nextInt(); System.out.print("Enter the number of consumers: "); final int consumerCount = input.nextInt(); System.out.print("Enter the number of messages per batch: "); final int batchSize = input.nextInt(); System.out.print("Enter the message size in bytes: "); final int messageSizeByte = input.nextInt(); System.out.print("Enter the run time in minutes: "); final int runTimeMinutes = input.nextInt(); /* * Create a new instance of the builder with all defaults (credentials * and region) set automatically. For more information, see Creating * Service Clients in the AWS SDK for Java Developer Guide. */ final ClientConfiguration clientConfiguration = new ClientConfiguration() .withMaxConnections(producerCount + consumerCount); final AmazonSQS sqsClient = AmazonSQSClientBuilder.standard() .withClientConfiguration(clientConfiguration) .build(); final String queueUrl = sqsClient .getQueueUrl(new GetQueueUrlRequest(queueName)).getQueueUrl(); // The flag used to stop producer, consumer, and monitor threads. final AtomicBoolean stop = new AtomicBoolean(false); // Start the producers. final AtomicInteger producedCount = new AtomicInteger(); final Thread[] producers = new Thread[producerCount]; for (int i = 0; i < producerCount; i++) { if (batchSize == 1) { producers[i] = new Producer(sqsClient, queueUrl, messageSizeByte, producedCount, stop); } else { producers[i] = new BatchProducer(sqsClient, queueUrl, batchSize, messageSizeByte, producedCount, stop); } producers[i].start(); } // Start the consumers. final AtomicInteger consumedCount = new AtomicInteger(); final Thread[] consumers = new Thread[consumerCount]; for (int i = 0; i < consumerCount; i++) { if (batchSize == 1) { consumers[i] = new Consumer(sqsClient, queueUrl, consumedCount, stop); } else { consumers[i] = new BatchConsumer(sqsClient, queueUrl, batchSize, consumedCount, stop); } consumers[i].start(); } // Start the monitor thread. final Thread monitor = new Monitor(producedCount, consumedCount, stop); monitor.start(); // Wait for the specified amount of time then stop. Thread.sleep(TimeUnit.MINUTES.toMillis(Math.min(runTimeMinutes, MAX_RUNTIME_MINUTES))); stop.set(true); // Join all threads. for (int i = 0; i < producerCount; i++) { producers[i].join(); } for (int i = 0; i < consumerCount; i++) { consumers[i].join(); } monitor.interrupt(); monitor.join(); } private static String makeRandomString(int sizeByte) { final byte[] bs = new byte[(int) Math.ceil(sizeByte * 5 / 8)]; new Random().nextBytes(bs); bs[0] = (byte) ((bs[0] | 64) & 127); return new BigInteger(bs).toString(32); } /** * The producer thread uses {@code SendMessage} * to send messages until it is stopped. */ private static class Producer extends Thread { final AmazonSQS sqsClient; final String queueUrl; final AtomicInteger producedCount; final AtomicBoolean stop; final String theMessage; Producer(AmazonSQS sqsQueueBuffer, String queueUrl, int messageSizeByte, AtomicInteger producedCount, AtomicBoolean stop) { this.sqsClient = sqsQueueBuffer; this.queueUrl = queueUrl; this.producedCount = producedCount; this.stop = stop; this.theMessage = makeRandomString(messageSizeByte); } /* * The producedCount object tracks the number of messages produced by * all producer threads. If there is an error, the program exits the * run() method. */ public void run() { try { while (!stop.get()) { sqsClient.sendMessage(new SendMessageRequest(queueUrl, theMessage)); producedCount.incrementAndGet(); } } catch (AmazonClientException e) { /* * By default, AmazonSQSClient retries calls 3 times before * failing. If this unlikely condition occurs, stop. */ log.error("Producer: " + e.getMessage()); System.exit(1); } } } /** * The producer thread uses {@code SendMessageBatch} * to send messages until it is stopped. */ private static class BatchProducer extends Thread { final AmazonSQS sqsClient; final String queueUrl; final int batchSize; final AtomicInteger producedCount; final AtomicBoolean stop; final String theMessage; BatchProducer(AmazonSQS sqsQueueBuffer, String queueUrl, int batchSize, int messageSizeByte, AtomicInteger producedCount, AtomicBoolean stop) { this.sqsClient = sqsQueueBuffer; this.queueUrl = queueUrl; this.batchSize = batchSize; this.producedCount = producedCount; this.stop = stop; this.theMessage = makeRandomString(messageSizeByte); } public void run() { try { while (!stop.get()) { final SendMessageBatchRequest batchRequest = new SendMessageBatchRequest().withQueueUrl(queueUrl); final List<SendMessageBatchRequestEntry> entries = new ArrayList<SendMessageBatchRequestEntry>(); for (int i = 0; i < batchSize; i++) entries.add(new SendMessageBatchRequestEntry() .withId(Integer.toString(i)) .withMessageBody(theMessage)); batchRequest.setEntries(entries); final SendMessageBatchResult batchResult = sqsClient.sendMessageBatch(batchRequest); producedCount.addAndGet(batchResult.getSuccessful().size()); /* * Because SendMessageBatch can return successfully, but * individual batch items fail, retry the failed batch items. */ if (!batchResult.getFailed().isEmpty()) { log.warn("Producer: retrying sending " + batchResult.getFailed().size() + " messages"); for (int i = 0, n = batchResult.getFailed().size(); i < n; i++) { sqsClient.sendMessage(new SendMessageRequest(queueUrl, theMessage)); producedCount.incrementAndGet(); } } } } catch (AmazonClientException e) { /* * By default, AmazonSQSClient retries calls 3 times before * failing. If this unlikely condition occurs, stop. */ log.error("BatchProducer: " + e.getMessage()); System.exit(1); } } } /** * The consumer thread uses {@code ReceiveMessage} and {@code DeleteMessage} * to consume messages until it is stopped. */ private static class Consumer extends Thread { final AmazonSQS sqsClient; final String queueUrl; final AtomicInteger consumedCount; final AtomicBoolean stop; Consumer(AmazonSQS sqsClient, String queueUrl, AtomicInteger consumedCount, AtomicBoolean stop) { this.sqsClient = sqsClient; this.queueUrl = queueUrl; this.consumedCount = consumedCount; this.stop = stop; } /* * Each consumer thread receives and deletes messages until the main * thread stops the consumer thread. The consumedCount object tracks the * number of messages that are consumed by all consumer threads, and the * count is logged periodically. */ public void run() { try { while (!stop.get()) { try { final ReceiveMessageResult result = sqsClient .receiveMessage(new ReceiveMessageRequest(queueUrl)); if (!result.getMessages().isEmpty()) { final Message m = result.getMessages().get(0); sqsClient.deleteMessage(new DeleteMessageRequest(queueUrl, m.getReceiptHandle())); consumedCount.incrementAndGet(); } } catch (AmazonClientException e) { log.error(e.getMessage()); } } } catch (AmazonClientException e) { /* * By default, AmazonSQSClient retries calls 3 times before * failing. If this unlikely condition occurs, stop. */ log.error("Consumer: " + e.getMessage()); System.exit(1); } } } /** * The consumer thread uses {@code ReceiveMessage} and {@code * DeleteMessageBatch} to consume messages until it is stopped. */ private static class BatchConsumer extends Thread { final AmazonSQS sqsClient; final String queueUrl; final int batchSize; final AtomicInteger consumedCount; final AtomicBoolean stop; BatchConsumer(AmazonSQS sqsClient, String queueUrl, int batchSize, AtomicInteger consumedCount, AtomicBoolean stop) { this.sqsClient = sqsClient; this.queueUrl = queueUrl; this.batchSize = batchSize; this.consumedCount = consumedCount; this.stop = stop; } public void run() { try { while (!stop.get()) { final ReceiveMessageResult result = sqsClient .receiveMessage(new ReceiveMessageRequest(queueUrl) .withMaxNumberOfMessages(batchSize)); if (!result.getMessages().isEmpty()) { final List<Message> messages = result.getMessages(); final DeleteMessageBatchRequest batchRequest = new DeleteMessageBatchRequest() .withQueueUrl(queueUrl); final List<DeleteMessageBatchRequestEntry> entries = new ArrayList<DeleteMessageBatchRequestEntry>(); for (int i = 0, n = messages.size(); i < n; i++) entries.add(new DeleteMessageBatchRequestEntry() .withId(Integer.toString(i)) .withReceiptHandle(messages.get(i) .getReceiptHandle())); batchRequest.setEntries(entries); final DeleteMessageBatchResult batchResult = sqsClient .deleteMessageBatch(batchRequest); consumedCount.addAndGet(batchResult.getSuccessful().size()); /* * Because DeleteMessageBatch can return successfully, * but individual batch items fail, retry the failed * batch items. */ if (!batchResult.getFailed().isEmpty()) { final int n = batchResult.getFailed().size(); log.warn("Producer: retrying deleting " + n + " messages"); for (BatchResultErrorEntry e : batchResult .getFailed()) { sqsClient.deleteMessage( new DeleteMessageRequest(queueUrl, messages.get(Integer .parseInt(e.getId())) .getReceiptHandle())); consumedCount.incrementAndGet(); } } } } } catch (AmazonClientException e) { /* * By default, AmazonSQSClient retries calls 3 times before * failing. If this unlikely condition occurs, stop. */ log.error("BatchConsumer: " + e.getMessage()); System.exit(1); } } } /** * This thread prints every second the number of messages produced and * consumed so far. */ private static class Monitor extends Thread { private final AtomicInteger producedCount; private final AtomicInteger consumedCount; private final AtomicBoolean stop; Monitor(AtomicInteger producedCount, AtomicInteger consumedCount, AtomicBoolean stop) { this.producedCount = producedCount; this.consumedCount = consumedCount; this.stop = stop; } public void run() { try { while (!stop.get()) { Thread.sleep(1000); log.info("produced messages = " + producedCount.get() + ", consumed messages = " + consumedCount.get()); } } catch (InterruptedException e) { // Allow the thread to exit. } } } }

Überwachen von Volume-Metriken aus der Beispielausführung

Amazon generiert SQS automatisch Volumenmetriken für gesendete, empfangene und gelöschte Nachrichten. Auf diese und andere Metriken können Sie über die Registerkarte Monitoring (Überwachung) für Ihre Warteschlange oder in der CloudWatch -Konsole zugreifen.

Anmerkung

Nach dem Starten der Warteschlange kann es bis zu 15 Minuten dauern, bis die Metriken verfügbar sind.