Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Sviluppa consumatori con produttività condivisa con AWS SDK for Java
Uno dei metodi per sviluppare utenti Kinesis Data Streams personalizzati con condivisione integrale consiste nell'utilizzare Amazon APIs Kinesis Data Streams con. AWS SDK for Java Questa sezione descrive l'utilizzo di Kinesis APIs Data AWS SDK for Java Streams con. È possibile chiamare Kinesis APIs Data Streams utilizzando altri linguaggi di programmazione diversi. Per ulteriori informazioni su tutto ciò che è disponibile AWS SDKs, consulta Inizia a sviluppare con Amazon Web Services
Il codice di esempio Java in questa sezione mostra come eseguire le operazioni di base di Kinesis Data API Streams ed è suddiviso logicamente per tipo di operazione. Questi esempi non rappresentano il codice pronto per la produzione. Non verificano la presenza di eccezioni o di account per tutte le possibili considerazioni di sicurezza o di prestazione.
Ottieni dati da un flusso
I Kinesis APIs Data Streams getShardIterator
includono i metodi e che getRecords
è possibile richiamare per recuperare i record da un flusso di dati. Si tratta di un modello pull, dove il codice disegna i record di dati direttamente dalle partizioni del flusso di dati.
Importante
Ti consigliamo di utilizzare il supporto per il processore di record fornito da KCL per recuperare i record dai tuoi flussi di dati. Si tratta di un modello push, dove è possibile implementare il codice che elabora i dati. KCLRecupera i record di dati dal flusso di dati e li invia al codice dell'applicazione. Inoltre, KCL fornisce funzionalità di failover, ripristino e bilanciamento del carico. Per ulteriori informazioni, vedere Sviluppo di consumatori personalizzati con utilizzo di throughput condiviso. KCL
Tuttavia, in alcuni casi potresti preferire utilizzare Kinesis APIs Data Streams. Ad esempio, per implementare strumenti personalizzati per il monitoraggio o il debug dei tuoi flussi di dati.
Importante
Il flusso di dati Kinesis supporta le modifiche al periodo di conservazione dei record di dati del tuo flusso di dati. Per ulteriori informazioni, consulta Modifica il periodo di conservazione dei dati.
Usa gli iteratori shard
Puoi recuperare i record dal flusso per shard. Per ogni shard e per ogni batch di record recuperati da quello shard, è necessario ottenere un iteratore shard. L'iteratore shard viene utilizzato nell'oggetto getRecordsRequest
per specificare lo shard da cui devono essere recuperati i dati. Il tipo associato all'iteratore shard determina il punto nello shard da cui devono essere recuperati i record (vedi più avanti in questa sezione per ulteriori dettagli). Prima di poter lavorare con lo shard iterator, è necessario recuperare lo shard. Per ulteriori informazioni, consulta Elenca i frammenti.
Ottieni l'iteratore shard iniziale utilizzando il metodo getShardIterator
. Ottieni iteratori di shard per ulteriori batch di record utilizzando il metodo getNextShardIterator
dell'oggetto getRecordsResult
restituito dal metodo getRecords
. Un iteratore shard è valido per 5 minuti. Se utilizzi un iteratore shard mentre è valido, puoi ottenerne uno nuovo. Ogni iteratore shard rimane valido per 5 minuti, anche dopo il suo utilizzo.
Per ottenere l'iteratore shard iniziale, crea un'istanza GetShardIteratorRequest
e passala al metodo getShardIterator
. Per configurare la richiesta, specifica il flusso e l'ID dello shard. Per informazioni su come ottenere gli stream del tuo account, consulta. AWS Visualizzazione dell'elenco di flussi Per informazioni su come ottenere gli shard in un flusso, consulta Elenca i frammenti.
String shardIterator; GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(myStreamName); getShardIteratorRequest.setShardId(shard.getShardId()); getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest); shardIterator = getShardIteratorResult.getShardIterator();
Questo codice di esempio specifica TRIM_HORIZON
come il tipo di iteratore che si utilizza per ottenere l'iteratore shard iniziale. Questo tipo di iterazione significa che i record devono essere restituiti a partire dal primo record aggiunto alla partizione anziché a partire dal record aggiunto più di recente, noto anche come estremità. I possibili tipi di iteratore sono i seguenti:
-
AT_SEQUENCE_NUMBER
-
AFTER_SEQUENCE_NUMBER
-
AT_TIMESTAMP
-
TRIM_HORIZON
-
LATEST
Per ulteriori informazioni, consulta ShardIteratorType.
Per alcuni tipi di iteratore è necessario specificare un numero di sequenza in aggiunta al tipo, ad esempio:
getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER"); getShardIteratorRequest.setStartingSequenceNumber(specialSequenceNumber);
Dopo aver ottenuto un record utilizzando getRecords
, è possibile ottenere il numero di sequenza per il record chiamando il metodo getSequenceNumber
del record.
record.getSequenceNumber()
Inoltre, il codice che aggiunge record per il flusso di dati è in grado di ottenere il numero di sequenza per un record aggiunto chiamando getSequenceNumber
sul risultato di putRecord
.
lastSequenceNumber = putRecordResult.getSequenceNumber();
È possibile utilizzare i numeri di sequenza per garantire che l'ordine dei record sia rigorosamente ascendente. Per ulteriori informazioni, consulta il codice di esempio in PutRecordesempio.
Usa GetRecords
Dopo aver ottenuto l'iteratore shard, crea un'istanza di un oggetto GetRecordsRequest
. Specifica l'iteratore per la richiesta utilizzando il metodo setShardIterator
.
Facoltativamente, puoi anche impostare il numero di record da recuperare utilizzando il metodo setLimit
. Il numero di record restituiti da getRecords
è sempre pari o inferiori a questo limite. Se non si specifica questo limite, getRecords
restituisce 10 MB di record recuperati. Il seguente codice di esempio imposta questo limite a 25 record.
Se non viene restituito alcun record, ciò significa che non sono attualmente disponibili record di dati da questo shard al numero di sequenza a cui fa riferimento l'iteratore shard. In questo caso la tua applicazione deve attendere per un periodo di tempo appropriato per le origini dati per il flusso. Di seguito, cerca di ottenere di nuovo dati dallo shard utilizzando l'iteratore shard restituito dalla chiamata precedente a getRecords
.
Passa la getRecordsRequest
al metodo getRecords
e acquisisci il valore restituito come un oggetto getRecordsResult
. Per ottenere i record di dati, chiama il metodo getRecords
nell'oggetto getRecordsResult
.
GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest); List<Record> records = getRecordsResult.getRecords();
Per prepararti a un'altra chiamata a getRecords
, ottieni l'iteratore shard successivo da getRecordsResult
.
shardIterator = getRecordsResult.getNextShardIterator();
Per ottenere migliori risultati, sospendi l'attività per almeno 1 secondo (1.000 millisecondi) tra le chiamate a getRecords
per evitare di superare il limite di frequenza di getRecords
.
try { Thread.sleep(1000); } catch (InterruptedException e) {}
In genere, si deve effettuare la chiamata getRecords
in un loop, anche quando si esegue il recupero di un singolo record in uno scenario di test. Una singola chiamata a getRecords
può restituire un elenco di record vuoto, anche quando lo shard contiene più record in numeri di sequenza successivi. In questo caso, il NextShardIterator
restituito con l'elenco vuoto fa riferimento a un numero di sequenza successivo nello shard e, in conclusione, le chiamate successive a getRecords
restituiscono i record. L'esempio seguente dimostra l'uso di un loop.
Esempio: getRecords
Il seguente codice di esempio riflette i suggerimenti in merito a getRecords
in questa sezione, tra cui l'esecuzione di chiamate in un loop.
// Continuously read data records from a shard List<Record> records; while (true) { // Create a new getRecordsRequest with an existing shardIterator // Set the maximum records to return to 25 GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult result = client.getRecords(getRecordsRequest); // Put the result into record list. The result can be empty. records = result.getRecords(); try { Thread.sleep(1000); } catch (InterruptedException exception) { throw new RuntimeException(exception); } shardIterator = result.getNextShardIterator(); }
Se si sta utilizzando la Kinesis Client Library, potrebbe effettuare più chiamate prima di restituire i dati. Questo comportamento è predefinito e non indica un problema con i dati KCL o con i tuoi dati.
Adattarsi a un reshard
Se getRecordsResult.getNextShardIterator
restituisce null
, indica che si è verificata una divisione o un'unione della partizione che ha interessato questa partizione. Questa partizione si trova ora nello stato CLOSED
e hai letto tutti i record di dati disponibili da questa partizione.
In questo scenario, è possibile utilizzare getRecordsResult.childShards
per conoscere le nuove partizioni secondarie della partizione in fase di elaborazione che sono state create dalla divisione o dall'unione. Per ulteriori informazioni, vedere. ChildShard
Nel caso di un frazionamento, i due nuovi shard hanno entrambi un parentShardId
identico all'ID dello shard che si stava elaborando in precedenza. Il valore di adjacentParentShardId
per entrambi gli shard è null
.
Nel caso di una fusione, il singolo nuovo shard creato mediante la fusione ha un parentShardId
identico all'ID di uno degli shard principali e un adjacentParentShardId
identico all'ID dell'altro shard principale. La tua applicazione ha già letto tutti i dati provenienti da uno di questi shard. Questo è lo shard per cui getRecordsResult.getNextShardIterator
ha restituito null
. Se l'ordine dei dati è importante per la tua applicazione, assicurati che quest'ultima legga anche tutti i dati dall'altro shard principale prima di leggere qualsiasi nuovo dato dallo shard secondario creato dalla fusione.
Se sta utilizzando più processori per recuperare dati dal flusso (ad esempio, un processore per shard) e si verifica una frammentazione o una fusione di shard, devi aumentare o diminuire il numero di processori per adattarti alla variazione nel numero di shard.
Per ulteriori informazioni sul resharding, tra cui una discussione in merito agli stati degli shard - ad esempio CLOSED
- consulta Condividi nuovamente uno stream.