Transactional-Outbox-Muster - AWS Präskriptive Leitlinien

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Transactional-Outbox-Muster

Absicht

Das Transactional-Outbox-Muster löst das Problem der dualen Schreiboperationen, das in verteilten Systemen auftritt, wenn eine einzelne Operation sowohl eine Datenbank-Schreiboperation als auch eine Nachrichten- oder Ereignisbenachrichtigung beinhaltet. Ein dualer Schreibvorgang tritt auf, wenn eine Anwendung in zwei verschiedene Systeme schreibt, z. B. wenn ein Microservice Daten in der Datenbank speichern und eine Nachricht senden muss, um andere Systeme zu benachrichtigen. Ein Fehler bei einem dieser Vorgänge kann zu inkonsistenten Daten führen.

Motivation

Wenn ein Microservice nach einer Datenbankaktualisierung eine Ereignisbenachrichtigung sendet, sollten diese beiden Operationen atomar ablaufen, um Datenkonsistenz und Zuverlässigkeit zu gewährleisten.

  • Wenn die Datenbankaktualisierung erfolgreich ist, die Ereignisbenachrichtigung jedoch fehlschlägt, bemerkt der nachgelagerte Service die Änderung nicht und das System kann in einen inkonsistenten Zustand übergehen.

  • Wenn die Datenbankaktualisierung fehlschlägt, die Ereignisbenachrichtigung jedoch gesendet wird, können Daten beschädigt werden, was die Zuverlässigkeit des Systems beeinträchtigen kann.

Anwendbarkeit

Verwenden Sie das Transactional-Outbox-Muster, wenn:

  • Sie eine ereignisgesteuerte Anwendung erstellen, bei der eine Datenbankaktualisierung eine Ereignisbenachrichtigung auslöst.

  • Sie sollten die Atomizität bei Vorgängen sicherstellen, an denen zwei Services beteiligt sind.

  • Sie sollten das Event-Sourcing-Muster implementieren.

Fehler und Überlegungen

  • Doppelte Nachrichten: Der Ereignisverarbeitungsservice sendet möglicherweise doppelte Nachrichten oder Ereignisse aus. Daher empfehlen wir Ihnen, den konsumierenden Service idempotent zu machen, indem Sie die verarbeiteten Nachrichten verfolgen.

  • Reihenfolge der Benachrichtigungen: Senden Sie Nachrichten oder Ereignisse in derselben Reihenfolge, in der der Service die Datenbank aktualisiert. Dies ist für das Ereignis-Sourcing-Muster von entscheidender Bedeutung, bei dem Sie einen Ereignisspeicher für die point-in-time Wiederherstellung des Datenspeichers verwenden können. Wenn die Reihenfolge falsch ist, kann dies die Qualität der Daten beeinträchtigen. Ereigniskonsistenz und ein Rollback der Datenbank können das Problem noch verschärfen, wenn die Reihenfolge der Benachrichtigungen nicht beibehalten wird.

  • Transaktions-Rollback: Senden Sie keine Ereignisbenachrichtigung, wenn die Transaktion rückgängig gemacht wird.

  • Transaktionsverarbeitung auf Serviceebene: Wenn die Transaktion Services umfasst, für die Datenspeicher-Aktualisierungen erforderlich sind, verwenden Sie das Saga-Orchestrierungsmuster, um die Datenintegrität in allen Datenspeichern zu gewährleisten.

Implementierung

Hochrangige Architektur

Das folgende Sequenzdiagramm zeigt die Reihenfolge der Ereignisse, die bei dualen Schreibvorgängen auftreten.

Reihenfolge der Ereignisse bei dualen Schreibvorgängen
  1. Der Flugservice schreibt in die Datenbank und sendet eine Ereignisbenachrichtigung an den Zahlungsservice.

  2. Der Message Broker leitet die Nachrichten und Ereignisse an den Zahlungsservice weiter. Jeder Fehler im Message Broker verhindert, dass der Zahlungsservice die Updates empfängt.

Wenn die Aktualisierung der Flug-Datenbank fehlschlägt, die Benachrichtigung jedoch gesendet wird, verarbeitet der Zahlungsservice die Zahlung auf der Grundlage der Ereignisbenachrichtigung. Dies führt zu Inkonsistenzen der nachgelagerten Daten.

Implementierung mithilfe von AWS -Services

Um das Muster im Sequenzdiagramm zu demonstrieren, verwenden wir die folgenden AWS Services, wie im folgenden Diagramm gezeigt.

Transaktions-Outbox-Muster mit AWS Lambda, Amazon RDS und Amazon SQS

Wenn der Flugservice nach der Bestätigung der Transaktion fehlschlägt, kann dies dazu führen, dass die Ereignisbenachrichtigung nicht gesendet wird.

Transaktionsfehler nach dem Commit-Vorgang

Allerdings könnte die Transaktion fehlschlagen und zurückgesetzt werden, wobei die Ereignisbenachrichtigung dennoch gesendet wird, so dass der Zahlungsservice die Zahlung abwickeln kann.

Transaktionsfehler nach dem Commit-Vorgang mit Rollback

Um dieses Problem zu beheben, können Sie eine Outbox-Tabelle oder Change Data Capture (CDC) verwenden. In den folgenden Abschnitten werden diese beiden Optionen erörtert und erklärt, wie Sie diese mithilfe von AWS-Services implementieren können.

Verwenden einer Outbox-Tabelle mit einer relationalen Datenbank

In einer Outbox-Tabelle werden alle Ereignisse des Flugservices mit einem Zeitstempel und einer Sequenznummer gespeichert.

Wenn die Flugtabelle aktualisiert wird, wird auch die Outbox-Tabelle in derselben Transaktion aktualisiert. Ein anderer Service (z. B. der Eregnisverarbeitungsservice) liest aus der Outbox-Tabelle und sendet das Ereignis an Amazon SQS. Amazon SQS sendet eine Nachricht über das Ereignis an den Zahlungsservice zur weiteren Bearbeitung. Die Amazon-SQS-Standard-Warteschlangen garantieren, dass die Nachricht mindestens einmal zugestellt wird und nicht verloren geht. Wenn Sie jedoch Amazon-SQS-Standard-Warteschlangen verwenden, wird dieselbe Nachricht oder dasselbe Ereignis möglicherweise mehr als einmal zugestellt. Daher sollten Sie die Idempotenz des Ereignisbenachrichtigungsservices sicherstellen (d. h., die mehrfache Verarbeitung derselben Nachricht sollte keine negativen Auswirkungen haben). Wenn Sie möchten, dass die Nachricht genau einmal mit der Nachrichtenreihenfolge zugestellt wird, können Sie Amazon-SQS-Warteschlangen mit FIFO (first in, first out) verwenden.

Schlägt die Aktualisierung der Flugtabelle oder die Aktualisierung der Ausgangstabelle fehl, wird die gesamte Transaktion zurückgesetzt, sodass keine Inkonsistenzen bei nachgelagerten Daten auftreten.

Rollback ohne Inkonsistenzen bei nachgelagerten Daten

In der folgenden Abbildung wird die Transactional-Outbox-Architektur mithilfe einer Amazon RDS-Datenbank implementiert. Wenn der Ereignisverarbeitungsservice die Outbox-Tabelle liest, erkennt er nur die Zeilen, die Teil einer bestätigten (erfolgreichen) Transaktion sind, und stellt dann die Nachricht für das Ereignis in die SQS-Warteschlange, die vom Zahlungsservice zur weiteren Verarbeitung gelesen wird. Dieses Design löst das Problem der doppelten Schreiboperationen und bewahrt die Reihenfolge der Nachrichten und Ereignisse durch die Verwendung von Zeitstempeln und Sequenznummern.

Design, das Probleme mit dualen Schreibvorgängen behebt

Verwenden von Change Data Capture (CDC)

Einige Datenbanken unterstützen die Veröffentlichung von Änderungen auf Elementebene, um geänderte Daten zu erfassen. Sie können die geänderten Elemente identifizieren und eine entsprechende Ereignisbenachrichtigung senden. Dies erspart den Aufwand, eine weitere Tabelle zur Nachverfolgung der Aktualisierungen zu erstellen. Das vom Flugservice ausgelöste Ereignis wird in einem anderen Attribut desselben Elements gespeichert.

Amazon DynamoDB ist eine NoSQL-Schlüsselwert-Datenbank, die CDC-Updates unterstützt. Im folgenden Sequenzdiagramm veröffentlicht DynamoDB Änderungen auf Elementebene an Amazon DynamoDB Streams. Der Ereignisverarbeitungsservice liest aus den Streams und veröffentlicht die Ereignismeldung an den Zahlungsservice zur weiteren Verarbeitung.

Transactional Outbox mit DynamoDB und DynamoDB Streams

DynamoDB Streams erfasst den Informationsfluss in Bezug auf Änderungen auf Elementebene in einer DynamoDB-Tabelle mithilfe einer zeitlich geordneten Reihenfolge.

Sie können ein Transactional-Outbox-Muster implementieren, indem Sie Streams in der DynamoDB-Tabelle aktivieren. Die Lambda-Funktion für den Ereignisverarbeitungsservice ist diesen Streams zugeordnet.

  • Wenn die Flugtabelle aktualisiert wird, werden die geänderten Daten von DynamoDB Streams erfasst, und der Ereignisverarbeitungsservice fragt den Stream nach neuen Datensätzen ab.

  • Wenn neue Stream-Datensätze verfügbar werden, platziert die Lambda-Funktion die Nachricht für das Ereignis synchron zur weiteren Verarbeitung in die SQS-Warteschlange. Sie können dem DynamoDB-Element ein Attribut hinzufügen, um den Zeitstempel und die Sequenznummer nach Bedarf zu erfassen, um die Robustheit der Implementierung zu verbessern.

Transactional Outbox mit CDC

Beispiel-Code

Verwenden einer Outbox-Tabelle

Der Beispielcode in diesem Abschnitt zeigt, wie Sie das transaktionale Outbox-Muster mithilfe einer Outbox-Tabelle implementieren können. Den vollständigen Code finden Sie im GitHubRepository für dieses Beispiel.

Der folgende Codeausschnitt speichert die Flight-Entität und das Flight-Ereignis in der Datenbank in ihren jeweiligen Tabellen innerhalb einer einzigen Transaktion.

@PostMapping("/flights") @Transactional public Flight createFlight(@Valid @RequestBody Flight flight) { Flight savedFlight = flightRepository.save(flight); JsonNode flightPayload = objectMapper.convertValue(flight, JsonNode.class); FlightOutbox outboxEvent = new FlightOutbox(flight.getId().toString(), FlightOutbox.EventType.FLIGHT_BOOKED, flightPayload); outboxRepository.save(outboxEvent); return savedFlight; }

Ein separater Service ist dafür zuständig, die Outbox-Tabelle regelmäßig nach neuen Ereignissen zu durchsuchen, sie an Amazon SQS zu senden und sie aus der Tabelle zu löschen, wenn Amazon SQS erfolgreich antwortet. Die Abfragerate ist in der application.properties-Datei konfigurierbar.

@Scheduled(fixedDelayString = "${sqs.polling_ms}") public void forwardEventsToSQS() { List<FlightOutbox> entities = outboxRepository.findAllByOrderByIdAsc(Pageable.ofSize(batchSize)).toList(); if (!entities.isEmpty()) { GetQueueUrlRequest getQueueRequest = GetQueueUrlRequest.builder() .queueName(sqsQueueName) .build(); String queueUrl = this.sqsClient.getQueueUrl(getQueueRequest).queueUrl(); List<SendMessageBatchRequestEntry> messageEntries = new ArrayList<>(); entities.forEach(entity -> messageEntries.add(SendMessageBatchRequestEntry.builder() .id(entity.getId().toString()) .messageGroupId(entity.getAggregateId()) .messageDeduplicationId(entity.getId().toString()) .messageBody(entity.getPayload().toString()) .build()) ); SendMessageBatchRequest sendMessageBatchRequest = SendMessageBatchRequest.builder() .queueUrl(queueUrl) .entries(messageEntries) .build(); sqsClient.sendMessageBatch(sendMessageBatchRequest); outboxRepository.deleteAllInBatch(entities); } }

Verwenden von Change Data Capture (CDC)

Der Beispielcode in diesem Abschnitt zeigt, wie Sie das transaktionale Outbox-Muster mithilfe der CDC-Funktionen (Change Data Capture) von DynamoDB implementieren können. Den vollständigen Code finden Sie im GitHubRepository für dieses Beispiel.

Der folgende AWS Cloud Development Kit (AWS CDK) Codeausschnitt erstellt eine DynamoDB-Flottentabelle und einen Amazon Kinesis Data Stream (cdcStream) und konfiguriert die Flugtabelle so, dass alle ihre Aktualisierungen an den Stream gesendet werden.

Const cdcStream = new kinesis.Stream(this, 'flightsCDCStream', { streamName: 'flightsCDCStream' }) const flightTable = new dynamodb.Table(this, 'flight', { tableName: 'flight', kinesisStream: cdcStream, partitionKey: { name: 'id', type: dynamodb.AttributeType.STRING, } });

Der folgende Codeausschnitt und die folgende Konfiguration definieren eine Funktion für den Spring-Cloud-Stream, die die Updates im Kinesis-Stream aufnimmt und diese Ereignisse zur weiteren Verarbeitung an eine SQS-Warteschlange weiterleitet.

applications.properties spring.cloud.stream.bindings.sendToSQS-in-0.destination=${kinesisstreamname} spring.cloud.stream.bindings.sendToSQS-in-0.content-type=application/ddb QueueService.java @Bean public Consumer<Flight> sendToSQS() { return this::forwardEventsToSQS; } public void forwardEventsToSQS(Flight flight) { GetQueueUrlRequest getQueueRequest = GetQueueUrlRequest.builder() .queueName(sqsQueueName) .build(); String queueUrl = this.sqsClient.getQueueUrl(getQueueRequest).queueUrl(); try { SendMessageRequest send_msg_request = SendMessageRequest.builder() .queueUrl(queueUrl) .messageBody(objectMapper.writeValueAsString(flight)) .messageGroupId("1") .messageDeduplicationId(flight.getId().toString()) .build(); sqsClient.sendMessage(send_msg_request); } catch (IOException | AmazonServiceException e) { logger.error("Error sending message to SQS", e); } }

GitHub -Repository

Eine vollständige Implementierung der Beispielarchitektur für dieses Muster finden Sie im GitHub Repository unter https://github.com/aws-samples/transactional-outbox-pattern.