Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Risolvi i problemi dei consumatori di Kinesis Data Streams
I seguenti argomenti offrono soluzioni a problemi comuni con i consumatori di Amazon Kinesis Data Streams:
- Errore di compilazione con il costruttore LeaseManagementConfig
- Alcuni record Kinesis Data Streams vengono ignorati quando si utilizza la Kinesis Client Library
- I record che appartengono allo stesso frammento vengono elaborati da diversi processori di record contemporaneamente
- L'applicazione consumer sta leggendo a una velocità inferiore al previsto
- GetRecords restituisce un array di record vuoto anche quando nel flusso sono presenti dati
- Lo shard uterator scade inaspettatamente
- L'elaborazione dei record dei consumatori è in ritardo
- Errore di autorizzazione della chiave KMS master non autorizzata
- Risolvi altri problemi comuni per i consumatori
Errore di compilazione con il costruttore LeaseManagementConfig
Durante l'aggiornamento a Kinesis Client Library (KCL) versione 3.x o successiva, è possibile che si verifichi un errore di compilazione relativo al costruttore. LeaseManagementConfig
Se state creando direttamente un LeaseManagementConfig
oggetto per impostare le configurazioni anziché utilizzarlo ConfigsBuilder
nelle KCL versioni 3.x o successive, potreste visualizzare il seguente messaggio di errore durante la compilazione del codice dell'applicazione. KCL
Cannot resolve constructor 'LeaseManagementConfig(String, DynamoDbAsyncClient, KinesisAsyncClient, String)'
KCLcon le versioni 3.x o successive richiede l'aggiunta di un altro parametro, applicationName (tipo: String), dopo il parametro. tableName
-
Prima: leaseManagementConfig = new LeaseManagementConfig (tableName,,dynamoDBClient, kinesisClientstreamName,workerIdentifier)
-
Dopo: leaseManagementConfig = new LeaseManagementConfig (tableName, applicationName,dynamoDBClient,kinesisClient,streamName,workerIdentifier)
Invece di creare direttamente un LeaseManagementConfig oggetto, si consiglia di utilizzarlo ConfigsBuilder
per impostare le configurazioni in KCL 3.x e versioni successive. ConfigsBuilder
offre un modo più flessibile e gestibile per configurare l'applicazione. KCL
Di seguito è riportato un esempio di utilizzo ConfigsBuilder
per impostare le KCL configurazioni.
ConfigsBuilder configsBuilder = new ConfigsBuilder( streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory() ); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig() .failoverTimeMillis(60000), // this is an example configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() );
Alcuni record Kinesis Data Streams vengono ignorati quando si utilizza la Kinesis Client Library
La causa più comune di record ignorati è un'eccezione non gestita generata da processRecords
. La Kinesis Client Library (KCL) si basa sul processRecords
codice dell'utente per gestire eventuali eccezioni derivanti dall'elaborazione dei record di dati. Qualsiasi eccezione generata da processRecords
viene assorbita da. KCL Per evitare infiniti tentativi in caso di errore ricorrente, KCL non invia nuovamente il batch di record elaborati al momento dell'eccezione. KCLQuindi richiama processRecords
il successivo batch di record di dati senza riavviare il processore di registrazione. Ciò avviene nelle applicazioni dei consumatori che osservano i record ignorati. Per impedire che i record vengano ignorati, è necessario gestire tutte le eccezioni all'interno di processRecords
in modo appropriato.
I record che appartengono allo stesso frammento vengono elaborati da diversi processori di record contemporaneamente
Per ogni applicazione Kinesis Client Library (KCL) in esecuzione, uno shard ha un solo proprietario. Tuttavia, più processori di record possono elaborare temporaneamente lo stesso shard. Nel caso di un'istanza di lavoro che perde la connettività di rete, KCL presume che il worker irraggiungibile non stia più elaborando i record dopo la scadenza del tempo di failover e ordina ad altre istanze di lavoro di prendere il sopravvento. Per un breve periodo, i nuovi processori record e i processori record dei lavoratori irraggiungibili possono elaborare i dati dello stesso shard.
È consigliabile impostare un tempo di failover appropriato per la tua applicazione. Per le applicazioni a bassa latenza, l'impostazione predefinita di 10 secondi può rappresentare il tempo massimo che si desidera attendere. Tuttavia, nel caso in cui si prevedano problemi di connettività, ad esempio chiamate in aree geografiche dove la connettività potrebbe andare perduta con maggiore frequenza, questo numero può risultare troppo basso.
L'applicazione deve anticipare e gestire questo scenario, soprattutto perché la connettività di rete viene in genere ripristinata per il lavoratore precedentemente non raggiungibile. Se un record ha un processore shard assunto da un altro processore del record, è necessario gestire i seguenti due casi per eseguire un arresto regolare:
-
Una volta completata la chiamata corrente a, richiama
processRecords
il metodo shutdown sul KCL registratore con il motivo di spegnimento ''. ZOMBIE Si prevede che i tuoi processori record puliscano tutte le risorse in modo appropriato e quindi escano. -
Quando tenti di entrare in un checkpoint da un operaio «zombie», lui ti lancia. KCL
ShutdownException
Dopo aver ricevuto questa eccezione, il tuo codice dovrebbe uscire in modo pulito dal metodo corrente.
Per ulteriori informazioni, consulta Gestisci i record duplicati.
L'applicazione consumer sta leggendo a una velocità inferiore al previsto
I motivi più comuni per cui il rendimento di lettura è più lento del previsto sono i seguenti:
-
Molteplici applicazioni consumatori hanno letture totali che superano i limiti per shard. Per ulteriori informazioni, consulta Quote e limiti. In questo caso, è possibile aumentare il numero di partizioni nel flusso di dati Kinesis.
-
Il limite che specifica il numero massimo di GetRecords per chiamata potrebbe essere stato configurato con un valore basso. Se si utilizza ilKCL, è possibile che il lavoratore abbia configurato un valore basso per la
maxRecords
proprietà. In generale, consigliamo di utilizzare i valori predefiniti del sistema per questa proprietà. -
La logica all'interno della
processRecords
chiamata potrebbe impiegare più tempo del previsto per una serie di possibili ragioni; la logica potrebbe essere CPU intensa, bloccare l'I/O o ostacolare la sincronizzazione. Per verificare se ciò è vero, il test esegue processori di record vuoti e confronta il rendimento di lettura. Per informazioni su come gestire i dati in entrata, consulta Usa il resharding, lo scaling e l'elaborazione parallela per modificare il numero di shard.
Se disponi di una sola applicazione consumer, puoi sempre leggere almeno due volte più velocemente della velocità di inserimento. Questo perché puoi scrivere fino a 1.000 record al secondo per scritture, con una velocità massima totale di scrittura dei dati pari a 1 MB al secondo (comprese le chiavi di partizioni). Ogni partizione può supportare fino a 5 transazioni al secondo, con una velocità di lettura totale massima di 2 MB al secondo. Considera che ogni lettura (chiamata GetRecords) ottiene un batch di record. La dimensione dei dati restituiti da GetRecords varia a seconda dell'utilizzo dello shard. La dimensione massima di dati che GetRecords può restituire è 10 MB. Se una chiamata restituisce tale limite, le successive chiamate effettuate nei prossimi 5 secondi generano ProvisionedThroughputExceededException
.
GetRecords restituisce un array di record vuoto anche quando nel flusso sono presenti dati
Consumare o ottenere record è un modello di pull. Ci si aspetta che gli sviluppatori effettuino chiamate GetRecordsa ciclo continuo senza back-off. Ogni chiamata a GetRecords inoltre restituisce un valore ShardIterator
che deve essere usato nella prossima iterazione del ciclo.
L'operazione GetRecords non si blocca. Al contrario, ritorna immediatamente; con uno dei record di dati pertinenti o con un elemento Records
vuoto. Un elemento Records
vuoto viene restituito a due condizioni:
-
Non ci sono più dati al momento nello shard.
-
Non ci sono dati vicino alla parte dello shard puntato dal
ShardIterator
.
Quest'ultima condizione è lieve, ma è un compromesso di progettazione necessario per evitare tempi di ricerca illimitati (latenza) per il recupero dei record. Pertanto, l'applicazione che consuma il flusso dovrebbe eseguire il ciclo e chiamare GetRecords nella gestione dei record vuoti normalmente.
In uno scenario di produzione, l'unica volta in cui il ciclo continuo deve essere chiuso è quando il valore NextShardIterator
è NULL
. Quando NextShardIterator
è NULL
, significa che l'attuale shard è stato chiuso e il valore ShardIterator
punterebbe altrimenti oltre l'ultimo record. Se l'applicazione che consuma non chiama mai SplitShard oMergeShards, lo shard rimane aperto e le chiamate a GetRecords non restituiranno mai un valore NextShardIterator
che è NULL
.
Se utilizzi Kinesis Client Library (KCL), il modello di consumo sopra riportato viene riassunto automaticamente. Questo include la gestione automatica di un set di shard che cambia in modo dinamico. ConKCL, lo sviluppatore fornisce solo la logica per elaborare i record in entrata. Questo è possibile perché la libreria genera continue chiamate ai GetRecords per te.
Lo shard uterator scade inaspettatamente
Un nuovo iteratore shard viene restituito da ogni richiesta GetRecords (come NextShardIterator
), che utilizzerai nella prossima richiesta GetRecords (come ShardIterator
). Di solito, questo iteratore shard non scade prima dell'uso. Tuttavia, è possibile che gli iteratori shard scadano perché non hai chiamato GetRecords per più di 5 minuti o perché hai eseguito un riavvio della tua applicazione consumer.
Se l'iteratore di partizioni scade immediatamente prima di essere utilizzato, questo può indicare che la tabella DynamoDB utilizzata da Kinesis non dispone di capacità sufficiente per memorizzare i dati del lease. Questa situazione è più probabile se disponi di un numero elevato di shard. Per risolvere il problema, aumenta la capacità di scrittura assegnata alla tabella shard. Per ulteriori informazioni, consulta Utilizzate una tabella di leasing per tenere traccia degli shard elaborati dall'applicazione consumer KCL.
L'elaborazione dei record dei consumatori è in ritardo
Per la maggior parte dei casi d'uso, le applicazioni leggono i dati più recenti dal flusso. In alcune circostanze, le letture dei consumatori possono rimanere indietro, il che potrebbe non essere desiderato. Dopo aver identificato il punto in cui i consumatori stanno leggendo, guarda i motivi più comuni per cui i consumatori restano indietro.
Inizia con il parametro GetRecords.IteratorAgeMilliseconds
, che monitora la posizione di lettura in tutti gli shard e i consumatori nel flusso. Nota che se l'età di un iteratore supera il 50% del periodo di conservazione (per impostazione predefinita 24 ore, configurabile fino a 7 giorni), sussiste il rischio di perdita di dati a causa della scadenza del record. Una rapida soluzione di ripiego consiste nell'aumentare il periodo di conservazione. Ciò interrompe la perdita di dati importanti man mano che si risolve il problema. Per ulteriori informazioni, consulta Monitora il servizio Amazon Kinesis Data Streams con Amazon CloudWatch. Successivamente, identifica il ritardo con cui la tua applicazione consumer sta leggendo ogni shard utilizzando una CloudWatch metrica personalizzata emessa dalla Kinesis Client Library (),. KCL MillisBehindLatest
Per ulteriori informazioni, consulta Monitora la libreria client Kinesis con Amazon CloudWatch.
Ecco i motivi più comuni per cui i consumatori possono rimanere indietro:
-
I forti aumenti improvvisi
GetRecords.IteratorAgeMilliseconds
o, inMillisBehindLatest
genere, indicano un problema transitorio, ad esempio errori API operativi di un'applicazione a valle. Se uno dei parametri mostra costantemente questo comportamento, è consigliabile investigare su questi improvvisi aumenti. -
Un aumento graduale di questi parametri indica che un consumer non è al passo con lo streaming perché non sta elaborando i record abbastanza velocemente. Le cause principali più comuni di questo comportamento sono risorse fisiche insufficienti o logica di elaborazione dei record che non è stata ridimensionata con un aumento del rendimento del flusso. È possibile verificare questo comportamento esaminando le altre CloudWatch metriche personalizzate KCL emesse dagli indicatori associati all'
processTask
operazione, tra cui, e.RecordProcessor.processRecords.Time
Success
RecordsProcessed
-
Se ottieni un aumento del parametro
processRecords.Time
correlato con l'aumento del rendimento, devi analizzare la logica di elaborazione dei record per identificare il motivo per cui non si ridimensiona con il rendimento aumentato. -
Se ottieni un aumento per i valori
processRecords.Time
che non sono correlati con un maggiore rendimento, controlla se stai effettuando chiamate di blocco nel percorso critico, che sono spesso causa di rallentamenti nell'elaborazione dei record. Un approccio alternativo è aumentare il parallelismo aumentando il numero di shard. Infine, verificate di disporre di una quantità adeguata di risorse fisiche (memoria, CPU utilizzo, ecc.) sui nodi di elaborazione sottostanti durante i picchi di domanda.
-
Errore di autorizzazione della chiave KMS master non autorizzata
Questo errore si verifica quando un'applicazione consumer legge da un flusso crittografato senza autorizzazioni sulla KMS chiave master. Per assegnare le autorizzazioni a un'applicazione per accedere a una KMS chiave, vedere Utilizzo delle politiche chiave in AWS KMS e Utilizzo delle politiche con. IAM AWS KMS
Risolvi altri problemi comuni per i consumatori
-
Perché il trigger del flusso di dati Kinesis non è in grado di richiamare la mia funzione Lambda?
-
Perché riscontro problemi di latenza elevata con il flusso di dati Kinesis?
-
Perché il mio flusso di dati Kinesis restituisce un errore interno del server 500?
-
Come posso risolvere un'KCLapplicazione bloccata o bloccata per Kinesis Data Streams?