Gestione di record duplicati - Flusso di dati Amazon Kinesis

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Gestione di record duplicati

Ci sono due motivi principali per cui i record possono essere distribuiti più di una volta alla tua applicazione del flusso di dati Amazon Kinesis: i nuovi tentativi del producer e i nuovi tentativi del consumer. La tua applicazione deve prevedere e gestire in modo appropriato l'elaborazione di singoli record più volte.

Nuovi tentativi del producer

Considera un producer in cui si verifica un timeout della rete dopo una chiamata a PutRecord, ma prima di poter ricevere una conferma da Flusso di dati Amazon Kinesis . Il producer non può essere certo se il record è stato distribuito a . Considerando che ogni record è importante per l'applicazione, il producer sarebbe stato scritto per riprovare la chiamata con gli stessi dati. Se entrambe le chiamate a PutRecord sugli stessi dati sono state inviate correttamente al flusso di dati Kinesis saranno presenti due record del flusso di dati Kinesis. Anche se i due record dispongono di dati identici, hanno anche numeri di sequenza univoci. Le applicazioni che richiedono garanzie rigorose devono incorporare una chiave primaria nel record per rimuovere i duplicati più avanti nel corso dell'elaborazione. Si noti che il numero di duplicati dovuti a nuovi tentativi del producer è solitamente basso rispetto al numero di duplicati dovuti a nuovi tentativi del consumer.

Nota

Se utilizzi l' AWS SDKPutRecord, scopri il comportamento di SDK Retry nella guida per l'utente di SDK and Tools.AWS

Nuovi tentativi del consumer

I nuovi tentativi del consumer (applicazione di elaborazione dei dati) si verificano quando si riavviano i processori di record. I processori di record per lo stesso shard vengono riavviati nei seguenti casi:

  1. Il lavoratore viene terminato in modo inaspettato

  2. Le istanze del lavoratore vengono aggiunte o rimosse

  3. Gli shard sono fusi o frazionati

  4. L'applicazione viene distribuita

In tutti questi casi, la mappatura shards-to-worker-to -record-processor viene continuamente aggiornata per l'elaborazione del bilanciamento del carico. I processori di shard migrati ad altre istanze riavviano l'elaborazione di record dall'ultimo checkpoint. Ciò comporta un'elaborazione di record duplicati, come illustrato nell'esempio seguente. Per ulteriori informazioni sul bilanciamento del carico, consulta Resharding, dimensionamento ed elaborazione parallela.

Esempio: nuovi tentativi del consumer che comportano una nuova distribuzione dei record

In questo esempio, si dispone di un'applicazione che legge i record da un flusso in modo continuo, aggrega i record in un file locale e carica il file in Amazon S3. Per semplicità, supponiamo di avere solo uno shard e un lavoratore che elabora lo shard. Considera il seguente esempio di sequenza di eventi, supponendo che l'ultimo checkpoint si è verificato al numero di record 10.000:

  1. Un lavoratore legge il batch successivo di record dallo shard, i record da 10.001 a 20.000.

  2. Di seguito, il lavoratore trasferisce il batch di record al processore di record associato.

  3. Il processore di record aggrega i dati, crea un file Amazon S3 e carica correttamente il file in Amazon S3.

  4. Il lavoratore viene terminato in modo inaspettato prima che possa verificarsi un nuovo checkpoint.

  5. La applicazione, il lavoratore e il processore di record si riavviano.

  6. A partire da questo momento, il lavoratore comincia a leggere dall'ultimo checkpoint eseguito correttamente, in questo caso 10.001.

Pertanto, i record 10.001-20.000 vengono consumati più di una volta.

Resistenza ai nuovi tentativi del consumer

Anche se è possibile che i record siano elaborati più di una volta, l'applicazione potrebbe voler presentare gli effetti collaterali come se i record fossero stati elaborati solo una volta (elaborazione idempotente). Le soluzioni a questo problema variano in complessità e accuratezza. Se la destinazione dei dati finali è in grado di gestire correttamente i duplicati, ti consigliamo di affidarti alla destinazione finale per ottenere l'elaborazione idempotente. Ad esempio, con Elasticsearch è possibile utilizzare una combinazione di funzione Versioni multiple e ID univoci per evitare l'elaborazione duplicata.

Nell'applicazione di esempio nella sezione precedente, l'applicazione legge in modo continuo i record da un flusso, aggrega i record in un file locale e carica il file in Amazon S3. Come indicato, i record da 10.001 a 20.000 vengono consumati più di una volta e ciò comporta la presenza di più file Amazon S3 con gli stessi dati. Un modo per mitigare i duplicati in questo esempio è di assicurarsi che nella fase 3 sia utilizzato il seguente schema:

  1. Il processore di record utilizza un numero fisso di record per file Amazon S3, ad esempio 5.000.

  2. Il nome di file utilizza questo schema: prefisso di Amazon S3, ID della partizione e First-Sequence-Num. In questo caso, potrebbe essere qualcosa di simile a sample-shard000001-10001.

  3. Una volta caricato il file Amazon S3, crea un checkpoint specificando Last-Sequence-Num. In questo caso, il checkpoint verrebbe eseguito al numero di record 15.000.

Con questo schema, anche se i record vengono elaborati più di una volta, il file Amazon S3 risultante ha lo stesso nome e gli stessi dati. I nuovi tentativi hanno come risultato esclusivamente la scrittura degli stessi dati nello stesso file più di una volta.

Nel caso di un'operazione di reshard, il numero di record rimasti nello shard potrebbe essere inferiore al numero fisso desiderato. In questo caso, il tuo metodo shutdown() deve scaricare il file ad Amazon S3 ed eseguire il checkpoint nell'ultimo numero di sequenza. Il suddetto schema è compatibile anche con le operazioni di reshard.