Sviluppa consumatori con produttività condivisa con AWS SDK for Java - Flusso di dati Amazon Kinesis

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Sviluppa consumatori con produttività condivisa con AWS SDK for Java

Uno dei metodi per sviluppare utenti Kinesis Data Streams personalizzati con condivisione integrale consiste nell'utilizzare Amazon APIs Kinesis Data Streams con. AWS SDK for Java Questa sezione descrive l'utilizzo di Kinesis APIs Data AWS SDK for Java Streams con. È possibile chiamare Kinesis APIs Data Streams utilizzando altri linguaggi di programmazione diversi. Per ulteriori informazioni su tutto ciò che è disponibile AWS SDKs, consulta Inizia a sviluppare con Amazon Web Services.

Il codice di esempio Java in questa sezione mostra come eseguire le operazioni di base di Kinesis Data API Streams ed è suddiviso logicamente per tipo di operazione. Questi esempi non rappresentano il codice pronto per la produzione. Non verificano la presenza di eccezioni o di account per tutte le possibili considerazioni di sicurezza o di prestazione.

Ottieni dati da un flusso

I Kinesis APIs Data Streams getShardIterator includono i metodi e che getRecords è possibile richiamare per recuperare i record da un flusso di dati. Si tratta di un modello pull, dove il codice disegna i record di dati direttamente dalle partizioni del flusso di dati.

Importante

Ti consigliamo di utilizzare il supporto per il processore di record fornito da KCL per recuperare i record dai tuoi flussi di dati. Si tratta di un modello push, dove è possibile implementare il codice che elabora i dati. KCLRecupera i record di dati dal flusso di dati e li invia al codice dell'applicazione. Inoltre, KCL fornisce funzionalità di failover, ripristino e bilanciamento del carico. Per ulteriori informazioni, vedere Sviluppo di consumatori personalizzati con utilizzo di throughput condiviso. KCL

Tuttavia, in alcuni casi potresti preferire utilizzare Kinesis APIs Data Streams. Ad esempio, per implementare strumenti personalizzati per il monitoraggio o il debug dei tuoi flussi di dati.

Importante

Il flusso di dati Kinesis supporta le modifiche al periodo di conservazione dei record di dati del tuo flusso di dati. Per ulteriori informazioni, consulta Modifica il periodo di conservazione dei dati.

Usa gli iteratori shard

Puoi recuperare i record dal flusso per shard. Per ogni shard e per ogni batch di record recuperati da quello shard, è necessario ottenere un iteratore shard. L'iteratore shard viene utilizzato nell'oggetto getRecordsRequest per specificare lo shard da cui devono essere recuperati i dati. Il tipo associato all'iteratore shard determina il punto nello shard da cui devono essere recuperati i record (vedi più avanti in questa sezione per ulteriori dettagli). Prima di poter lavorare con lo shard iterator, è necessario recuperare lo shard. Per ulteriori informazioni, consulta Elenca i frammenti.

Ottieni l'iteratore shard iniziale utilizzando il metodo getShardIterator. Ottieni iteratori di shard per ulteriori batch di record utilizzando il metodo getNextShardIterator dell'oggetto getRecordsResult restituito dal metodo getRecords. Un iteratore shard è valido per 5 minuti. Se utilizzi un iteratore shard mentre è valido, puoi ottenerne uno nuovo. Ogni iteratore shard rimane valido per 5 minuti, anche dopo il suo utilizzo.

Per ottenere l'iteratore shard iniziale, crea un'istanza GetShardIteratorRequest e passala al metodo getShardIterator. Per configurare la richiesta, specifica il flusso e l'ID dello shard. Per informazioni su come ottenere gli stream del tuo account, consulta. AWS Visualizzazione dell'elenco di flussi Per informazioni su come ottenere gli shard in un flusso, consulta Elenca i frammenti.

String shardIterator; GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(myStreamName); getShardIteratorRequest.setShardId(shard.getShardId()); getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest); shardIterator = getShardIteratorResult.getShardIterator();

Questo codice di esempio specifica TRIM_HORIZON come il tipo di iteratore che si utilizza per ottenere l'iteratore shard iniziale. Questo tipo di iterazione significa che i record devono essere restituiti a partire dal primo record aggiunto alla partizione anziché a partire dal record aggiunto più di recente, noto anche come estremità. I possibili tipi di iteratore sono i seguenti:

  • AT_SEQUENCE_NUMBER

  • AFTER_SEQUENCE_NUMBER

  • AT_TIMESTAMP

  • TRIM_HORIZON

  • LATEST

Per ulteriori informazioni, consulta ShardIteratorType.

Per alcuni tipi di iteratore è necessario specificare un numero di sequenza in aggiunta al tipo, ad esempio:

getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER"); getShardIteratorRequest.setStartingSequenceNumber(specialSequenceNumber);

Dopo aver ottenuto un record utilizzando getRecords, è possibile ottenere il numero di sequenza per il record chiamando il metodo getSequenceNumber del record.

record.getSequenceNumber()

Inoltre, il codice che aggiunge record per il flusso di dati è in grado di ottenere il numero di sequenza per un record aggiunto chiamando getSequenceNumber sul risultato di putRecord.

lastSequenceNumber = putRecordResult.getSequenceNumber();

È possibile utilizzare i numeri di sequenza per garantire che l'ordine dei record sia rigorosamente ascendente. Per ulteriori informazioni, consulta il codice di esempio in PutRecordesempio.

Usa GetRecords

Dopo aver ottenuto l'iteratore shard, crea un'istanza di un oggetto GetRecordsRequest. Specifica l'iteratore per la richiesta utilizzando il metodo setShardIterator.

Facoltativamente, puoi anche impostare il numero di record da recuperare utilizzando il metodo setLimit. Il numero di record restituiti da getRecords è sempre pari o inferiori a questo limite. Se non si specifica questo limite, getRecords restituisce 10 MB di record recuperati. Il seguente codice di esempio imposta questo limite a 25 record.

Se non viene restituito alcun record, ciò significa che non sono attualmente disponibili record di dati da questo shard al numero di sequenza a cui fa riferimento l'iteratore shard. In questo caso la tua applicazione deve attendere per un periodo di tempo appropriato per le origini dati per il flusso. Di seguito, cerca di ottenere di nuovo dati dallo shard utilizzando l'iteratore shard restituito dalla chiamata precedente a getRecords.

Passa la getRecordsRequest al metodo getRecords e acquisisci il valore restituito come un oggetto getRecordsResult. Per ottenere i record di dati, chiama il metodo getRecords nell'oggetto getRecordsResult.

GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest); List<Record> records = getRecordsResult.getRecords();

Per prepararti a un'altra chiamata a getRecords, ottieni l'iteratore shard successivo da getRecordsResult.

shardIterator = getRecordsResult.getNextShardIterator();

Per ottenere migliori risultati, sospendi l'attività per almeno 1 secondo (1.000 millisecondi) tra le chiamate a getRecords per evitare di superare il limite di frequenza di getRecords.

try { Thread.sleep(1000); } catch (InterruptedException e) {}

In genere, si deve effettuare la chiamata getRecords in un loop, anche quando si esegue il recupero di un singolo record in uno scenario di test. Una singola chiamata a getRecords può restituire un elenco di record vuoto, anche quando lo shard contiene più record in numeri di sequenza successivi. In questo caso, il NextShardIterator restituito con l'elenco vuoto fa riferimento a un numero di sequenza successivo nello shard e, in conclusione, le chiamate successive a getRecords restituiscono i record. L'esempio seguente dimostra l'uso di un loop.

Esempio: getRecords

Il seguente codice di esempio riflette i suggerimenti in merito a getRecords in questa sezione, tra cui l'esecuzione di chiamate in un loop.

// Continuously read data records from a shard List<Record> records; while (true) { // Create a new getRecordsRequest with an existing shardIterator // Set the maximum records to return to 25 GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult result = client.getRecords(getRecordsRequest); // Put the result into record list. The result can be empty. records = result.getRecords(); try { Thread.sleep(1000); } catch (InterruptedException exception) { throw new RuntimeException(exception); } shardIterator = result.getNextShardIterator(); }

Se si sta utilizzando la Kinesis Client Library, potrebbe effettuare più chiamate prima di restituire i dati. Questo comportamento è predefinito e non indica un problema con i dati KCL o con i tuoi dati.

Adattarsi a un reshard

Se getRecordsResult.getNextShardIterator restituisce null, indica che si è verificata una divisione o un'unione della partizione che ha interessato questa partizione. Questa partizione si trova ora nello stato CLOSED e hai letto tutti i record di dati disponibili da questa partizione.

In questo scenario, è possibile utilizzare getRecordsResult.childShards per conoscere le nuove partizioni secondarie della partizione in fase di elaborazione che sono state create dalla divisione o dall'unione. Per ulteriori informazioni, vedere. ChildShard

Nel caso di un frazionamento, i due nuovi shard hanno entrambi un parentShardId identico all'ID dello shard che si stava elaborando in precedenza. Il valore di adjacentParentShardId per entrambi gli shard è null.

Nel caso di una fusione, il singolo nuovo shard creato mediante la fusione ha un parentShardId identico all'ID di uno degli shard principali e un adjacentParentShardId identico all'ID dell'altro shard principale. La tua applicazione ha già letto tutti i dati provenienti da uno di questi shard. Questo è lo shard per cui getRecordsResult.getNextShardIterator ha restituito null. Se l'ordine dei dati è importante per la tua applicazione, assicurati che quest'ultima legga anche tutti i dati dall'altro shard principale prima di leggere qualsiasi nuovo dato dallo shard secondario creato dalla fusione.

Se sta utilizzando più processori per recuperare dati dal flusso (ad esempio, un processore per shard) e si verifica una frammentazione o una fusione di shard, devi aumentare o diminuire il numero di processori per adattarti alla variazione nel numero di shard.

Per ulteriori informazioni sul resharding, tra cui una discussione in merito agli stati degli shard - ad esempio CLOSED - consulta Condividi nuovamente uno stream.