Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Modello transazionale di posta in uscita
Intento
Il modello transazionale di posta in uscita risolve il problema delle operazioni di doppia scrittura che si verifica nei sistemi distribuiti quando una singola operazione comporta sia un'operazione di scrittura del database che una notifica di messaggio o evento. Un'operazione di doppia scrittura si verifica quando un'applicazione scrive su due sistemi diversi; ad esempio, quando un microservizio deve mantenere i dati nel database e inviare un messaggio per notificare altri sistemi. Un errore in una di queste operazioni potrebbe causare dati non coerenti.
Motivazione
Quando un microservizio invia una notifica di evento dopo un aggiornamento del database, queste due operazioni devono essere eseguite in modo atomico per garantire la coerenza e l'affidabilità dei dati.
-
Se l'aggiornamento del database ha esito positivo ma la notifica dell'evento non riesce, il servizio a valle non verrà a conoscenza della modifica e il sistema potrebbe entrare in uno stato incoerente.
-
Se l'aggiornamento del database non riesce ma viene inviata la notifica dell'evento, i dati potrebbero danneggiarsi, il che potrebbe influire sull'affidabilità del sistema.
Applicabilità
Utilizza il modello di posta in uscita transazionale quando:
-
Stai creando un'applicazione basata sugli eventi in cui un aggiornamento del database avvia una notifica di evento.
-
Vuoi garantire l'atomicità nelle operazioni che coinvolgono due servizi.
-
Desideri implementare il modello di approvvigionamento degli eventi.
Problemi e considerazioni
-
Messaggi duplicati: il servizio di elaborazione degli eventi potrebbe inviare messaggi o eventi duplicati, pertanto ti consigliamo di rendere idempotente il servizio di consumo monitorando i messaggi elaborati.
-
Ordine di notifica: invia messaggi o eventi nello stesso ordine in cui il servizio aggiorna il database. Questo è fondamentale per il modello di approvvigionamento degli eventi in cui è possibile utilizzare un archivio eventi per il point-in-time ripristino del data store. Se l'ordine non è corretto, potrebbe compromettere la qualità dei dati. L'eventuale coerenza e il rollback del database possono aggravare il problema se l'ordine delle notifiche non viene mantenuto.
-
Ripristino delle transazioni: non inviare una notifica di evento se la transazione viene ripristinata.
-
Gestione delle transazioni a livello di servizio: se la transazione include servizi che richiedono aggiornamenti dei datastore, utilizza il modello di orchestrazione Saga per preservare l'integrità dei dati nei datastore.
Implementazione
Architettura di alto livello
Il seguente diagramma di sequenza mostra l'ordine degli eventi che si verificano durante le operazioni di doppia scrittura.
-
Il servizio in corso scrive nel database e invia una notifica di evento al servizio di pagamento.
-
Il broker di messaggi trasmette i messaggi e gli eventi al servizio di pagamento. Qualsiasi errore nel broker di messaggi impedisce al servizio di pagamento di ricevere gli aggiornamenti.
Se l'aggiornamento del database in corso non riesce ma la notifica viene inviata, il servizio di pagamento elaborerà il pagamento in base alla notifica dell'evento. Ciò causerà incongruenze nei dati a valle.
Implementazione tramite servizi AWS
Per illustrare lo schema nel diagramma di sequenza, utilizzeremo i seguenti AWS servizi, come illustrato nel diagramma seguente.
-
I microservizi vengono implementati utilizzando AWS Lambda
. -
Il database primario è gestito da Amazon Relational Database Service (Amazon RDS)
. -
Amazon Simple Queue Service (Amazon SQS
) funge da broker di messaggi che riceve le notifiche degli eventi.
Se il servizio in corso fallisce dopo aver effettuato la transazione, ciò potrebbe comportare il mancato invio della notifica dell'evento.
Tuttavia, la transazione potrebbe fallire e tornare indietro, ma la notifica dell'evento potrebbe comunque essere inviata, causando l'elaborazione del pagamento da parte del servizio di pagamento.
Per risolvere questo problema, puoi utilizzare una tabella di posta in uscita o la funzionalità CDC (Change Data Capture). Le sezioni seguenti illustrano queste due opzioni e come implementarle utilizzando i servizi AWS.
Utilizzo di una tabella di posta in uscita con un database relazionale
Una tabella di posta in uscita memorizza tutti gli eventi del servizio in corso con un timestamp e un numero di sequenza.
Quando la tabella in corso viene aggiornata, anche la tabella di posta in uscita viene aggiornata nella stessa transazione. Un altro servizio (ad esempio, il servizio di elaborazione degli eventi) legge dalla tabella di posta in uscita e invia l'evento ad Amazon SQS. Amazon SQS invia un messaggio sull'evento al servizio di pagamento per un'ulteriore elaborazione. Le code standard di Amazon SQS garantiscono che il messaggio venga recapitato almeno una volta e non vada perso. Tuttavia, quando utilizzi le code standard di Amazon SQS, lo stesso messaggio o evento potrebbe essere recapitato più di una volta, quindi dovresti assicurarti che il servizio di notifica degli eventi sia idempotente (ovvero che l'elaborazione dello stesso messaggio più volte non abbia effetti negativi). Se desideri che il messaggio venga recapitato esattamente una sola volta, con l'ordinamento dei messaggi, puoi utilizzare le code FIFO (first in, first out) di Amazon SQS.
Se l'aggiornamento della tabella in corso non riesce o l'aggiornamento della tabella di posta in uscita non riesce, l'intera transazione viene ripristinata, quindi non ci sono incongruenze nei dati a valle.
Nel diagramma seguente, l'architettura transazionale della posta in uscita viene implementata utilizzando un database Amazon RDS. Quando il servizio di elaborazione degli eventi legge la tabella dei messaggi in uscita, riconosce solo le righe che fanno parte di una transazione confermata (riuscita), quindi inserisce il messaggio relativo all'evento nella coda SQS, che viene letta dal servizio di pagamento per un'ulteriore elaborazione. Questo comportamento risolve il problema delle operazioni di doppia scrittura e mantiene l'ordine dei messaggi e degli eventi utilizzando timestamp e numeri di sequenza.
Utilizzo dell'acquisizione dei dati di modifica (CDC)
Alcuni database supportano la pubblicazione di modifiche a livello di elemento per acquisire i dati modificati. È possibile identificare gli elementi modificati e inviare di conseguenza una notifica di evento. Ciò consente di risparmiare il sovraccarico dovuto alla creazione di un'altra tabella per tenere traccia degli aggiornamenti. L'evento avviato dal servizio in corso viene memorizzato in un altro attributo dello stesso elemento.
Amazon DynamoDB
I flussi DynamoDB acquisiscono il flusso di informazioni relative alle modifiche a livello di elemento in una tabella DynamoDB utilizzando una sequenza ordinata nel tempo.
È possibile implementare un modello di posta in uscita transazionale abilitando i flussi sulla tabella DynamoDB. La funzione Lambda per il servizio di elaborazione degli eventi è associata a questi flussi.
-
Quando la tabella in corso viene aggiornata, i dati modificati vengono acquisiti dai flussi DynamoDB e il servizio di elaborazione degli eventi analizza il flusso alla ricerca di nuovi record.
-
Quando diventano disponibili nuovi record di flusso, la funzione Lambda inserisce in modo sincrono il messaggio per l'evento nella coda SQS per un'ulteriore elaborazione. È possibile aggiungere un attributo all'elemento DynamoDB per acquisire il timestamp e il numero di sequenza necessari per migliorare la robustezza dell'implementazione.
Codice di esempio
Utilizzo di una tabella di posta in uscita
Il codice di esempio in questa sezione mostra come implementare il modello di posta in uscita transazionale utilizzando una tabella di posta in uscita. Per visualizzare il codice completo, consulta il GitHubrepository
Il seguente frammento di codice salva l'entità Flight
e l'evento Flight
nel database nelle rispettive tabelle all'interno di una singola transazione.
@PostMapping("/flights") @Transactional public Flight createFlight(@Valid @RequestBody Flight flight) { Flight savedFlight = flightRepository.save(flight); JsonNode flightPayload = objectMapper.convertValue(flight, JsonNode.class); FlightOutbox outboxEvent = new FlightOutbox(flight.getId().toString(), FlightOutbox.EventType.FLIGHT_BOOKED, flightPayload); outboxRepository.save(outboxEvent); return savedFlight; }
Un servizio separato si occupa di scansionare regolarmente la tabella in uscita alla ricerca di nuovi eventi, inviarli ad Amazon SQS ed eliminarli dalla tabella se Amazon SQS risponde correttamente. La frequenza di polling è configurabile nel file application.properties
.
@Scheduled(fixedDelayString = "${sqs.polling_ms}") public void forwardEventsToSQS() { List<FlightOutbox> entities = outboxRepository.findAllByOrderByIdAsc(Pageable.ofSize(batchSize)).toList(); if (!entities.isEmpty()) { GetQueueUrlRequest getQueueRequest = GetQueueUrlRequest.builder() .queueName(sqsQueueName) .build(); String queueUrl = this.sqsClient.getQueueUrl(getQueueRequest).queueUrl(); List<SendMessageBatchRequestEntry> messageEntries = new ArrayList<>(); entities.forEach(entity -> messageEntries.add(SendMessageBatchRequestEntry.builder() .id(entity.getId().toString()) .messageGroupId(entity.getAggregateId()) .messageDeduplicationId(entity.getId().toString()) .messageBody(entity.getPayload().toString()) .build()) ); SendMessageBatchRequest sendMessageBatchRequest = SendMessageBatchRequest.builder() .queueUrl(queueUrl) .entries(messageEntries) .build(); sqsClient.sendMessageBatch(sendMessageBatchRequest); outboxRepository.deleteAllInBatch(entities); } }
Utilizzo dell'acquisizione dei dati di modifica (CDC)
Il codice di esempio in questa sezione mostra come implementare il pattern transactional outbox utilizzando le funzionalità Change Data Capture (CDC) di DynamoDB. Per visualizzare il codice completo, consulta il repository di questo esempio. GitHub
Il seguente frammento di AWS Cloud Development Kit (AWS CDK)
codice crea una tabella di volo DynamoDB e un flusso di dati Amazon Kinesis (cdcStream
) e configura la tabella di volo per inviare tutti i relativi aggiornamenti allo stream.
Const cdcStream = new kinesis.Stream(this, 'flightsCDCStream', { streamName: 'flightsCDCStream' }) const flightTable = new dynamodb.Table(this, 'flight', { tableName: 'flight', kinesisStream: cdcStream, partitionKey: { name: 'id', type: dynamodb.AttributeType.STRING, } });
Il frammento di codice e la configurazione seguenti definiscono una funzione Spring Cloud Stream che raccoglie gli aggiornamenti nel flusso Kinesis e inoltra questi eventi a una coda SQS per un'ulteriore elaborazione.
applications.properties spring.cloud.stream.bindings.sendToSQS-in-0.destination=${kinesisstreamname} spring.cloud.stream.bindings.sendToSQS-in-0.content-type=application/ddb QueueService.java @Bean public Consumer<Flight> sendToSQS() { return this::forwardEventsToSQS; } public void forwardEventsToSQS(Flight flight) { GetQueueUrlRequest getQueueRequest = GetQueueUrlRequest.builder() .queueName(sqsQueueName) .build(); String queueUrl = this.sqsClient.getQueueUrl(getQueueRequest).queueUrl(); try { SendMessageRequest send_msg_request = SendMessageRequest.builder() .queueUrl(queueUrl) .messageBody(objectMapper.writeValueAsString(flight)) .messageGroupId("1") .messageDeduplicationId(flight.getId().toString()) .build(); sqsClient.sendMessage(send_msg_request); } catch (IOException | AmazonServiceException e) { logger.error("Error sending message to SQS", e); } }