Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
KCLInformazioni 1.x e 2.x
Nota
Le versioni 1.x e 2.x di Kinesis Client Library (KCL) sono obsolete. Consigliamo di effettuare la migrazione alla KCLversione 3.x, che offre prestazioni migliorate e nuove funzionalità. Per la KCL documentazione e la guida alla migrazione più recenti, consulta. Usa la libreria client Kinesis
Uno dei metodi per sviluppare applicazioni consumer personalizzate in grado di elaborare i dati dai flussi di KDS dati consiste nell'utilizzare la Kinesis Client Library KCL ().
Argomenti
- Informazioni su KCL (versioni precedenti)
- KCLversioni precedenti
- KCLconcetti (versioni precedenti)
- Utilizzate una tabella di leasing per tenere traccia degli shard elaborati dall'applicazione consumer KCL
- Elabora più flussi di dati con la stessa applicazione consumer KCL 2.x for Java
- Usa il registro con lo schema KCL AWS Glue
Nota
Sia per KCL 1.x che per KCL 2.x, si consiglia di eseguire l'aggiornamento alla versione KCL 1.x o alla versione KCL 2.x più recente, a seconda dello scenario di utilizzo. Sia KCL 1.x che KCL 2.x vengono regolarmente aggiornati con nuove versioni che includono le ultime patch di dipendenza e sicurezza, correzioni di bug e nuove funzionalità compatibili con le versioni precedenti. Per https://github.com/awslabs/amazon-kinesis-clientulteriori
Informazioni su KCL (versioni precedenti)
KCLti aiuta a consumare ed elaborare i dati da un flusso di dati Kinesis occupandoti di molte delle attività complesse associate all'elaborazione distribuita. Queste includono il bilanciamento del carico su più istanze di applicazioni consumer, la risposta agli errori delle istanze delle applicazioni consumer, il checkpoint dei record elaborati e la reazione al ripartizionamento. Si KCL occupa di tutte queste attività secondarie in modo che tu possa concentrare i tuoi sforzi sulla scrittura di una logica di elaborazione dei record personalizzata.
KCLÈ diverso dai Kinesis APIs Data Streams disponibili in. AWS SDKs Kinesis APIs Data Streams ti aiuta a gestire molti aspetti di Kinesis Data Streams, tra cui la creazione di stream, il resharding e l'inserimento e l'acquisizione di record. KCLFornisce un livello di astrazione su tutte queste attività secondarie, in particolare per consentirti di concentrarti sulla logica di elaborazione dei dati personalizzata dell'applicazione consumer. Per informazioni su Kinesis API Data Streams, consulta Amazon API Kinesis Reference.
Importante
KCLÈ una libreria Java. Il supporto per linguaggi diversi da Java viene fornito utilizzando un'interfaccia multilingue chiamata. MultiLangDaemon Questo demone è basato su Java e viene eseguito in background quando si utilizza un KCL linguaggio diverso da Java. Ad esempio, se installi KCL for Python e scrivi la tua applicazione consumer interamente in Python, avrai comunque bisogno che Java sia installato sul tuo sistema a causa di. MultiLangDaemon Inoltre, MultiLangDaemon ha alcune impostazioni predefinite che potresti dover personalizzare in base al tuo caso d'uso, ad esempio la AWS regione a cui si connette. Per ulteriori informazioni su MultiLangDaemon on GitHub, vedi KCL MultiLangDaemon project
KCLFunge da intermediario tra la logica di elaborazione dei record e Kinesis Data Streams.
KCLversioni precedenti
Attualmente, puoi utilizzare una delle seguenti versioni supportate di KCL per creare applicazioni consumer personalizzate:
-
KCL1.x
Per ulteriori informazioni, consulta Sviluppa KCL consumatori 1.x
-
KCL2.x
Per ulteriori informazioni, consulta Sviluppa consumatori 2.x KCL
È possibile utilizzare KCL 1.x o KCL 2.x per creare applicazioni consumer che utilizzano un throughput condiviso. Per ulteriori informazioni, consulta Sviluppa consumatori personalizzati con un throughput condiviso utilizzando KCL.
Per creare applicazioni consumer che utilizzano un throughput dedicato (utenti fan-out avanzati), puoi utilizzare solo 2.x. KCL Per ulteriori informazioni, consulta Sviluppa consumatori con fan-out migliorati con un throughput dedicato.
Per informazioni sulle differenze tra KCL 1.x e KCL 2.x e istruzioni su come migrare da 1.x a 2.x, vedere. KCL KCL Esegui la migrazione dei consumatori da 1.x a 2.x KCL KCL
KCLconcetti (versioni precedenti)
-
KCLapplicazione consumer: un'applicazione personalizzata utilizzando KCL e progettata per leggere ed elaborare i record dai flussi di dati.
-
Istanza di applicazioni consumer: le applicazioni KCL consumer sono in genere distribuite, con una o più istanze applicative in esecuzione simultanea per coordinarsi in caso di guasti e bilanciare dinamicamente il carico di elaborazione dei record di dati.
-
Worker: una classe di alto livello utilizzata da un'istanza di applicazioni KCL consumer per iniziare l'elaborazione dei dati.
Importante
Ogni istanza dell'applicazione KCL consumer ha un worker.
Il worker inizializza e supervisiona varie attività, tra cui la sincronizzazione delle informazioni sulle partizioni e sui lease, il monitoraggio delle assegnazioni delle partizioni e l'elaborazione dei dati dalle partizioni. Un worker fornisce KCL le informazioni di configurazione per l'applicazione consumer, ad esempio il nome del flusso di dati i cui record di dati KCL verranno elaborati dall'applicazione consumer e le AWS credenziali necessarie per accedere a questo flusso di dati. Il worker avvia inoltre quella specifica istanza dell'applicazione KCL consumer per fornire i record di dati dal flusso di dati ai processori di record.
Importante
Nella versione KCL 1.x questa classe si chiama Worker. Per ulteriori informazioni (questi sono i KCL repository Java), vedere https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java
. Nella versione KCL 2.x, questa classe è denominata Scheduler. Lo scopo di Scheduler in KCL 2.x è identico allo scopo di Worker in 1.x. KCL Per ulteriori informazioni sulla classe Scheduler in KCL 2.x, vedete/.java. https://github.com/awslabs/ amazon-kinesis-client blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler -
Lease: dati che definiscono l'associazione tra un worker e una partizione. Le applicazioni distribuite per i KCL consumatori utilizzano i contratti di locazione per suddividere l'elaborazione dei record di dati tra un parco di lavoratori. In qualsiasi momento, ogni frammento di record di dati è vincolato a un determinato lavoratore da un contratto di locazione identificato dalla variabile. leaseKey
Per impostazione predefinita, un lavoratore può detenere uno o più contratti di locazione (in base al valore della variabile maxLeasesForWorker) contemporaneamente.
Importante
Ogni worker si impegna a detenere tutti i lease disponibili per tutte le partizioni disponibili in un flusso di dati. Ma solo un worker alla volta si aggiudicherà con successo ogni lease.
Ad esempio, se si dispone di un'istanza di applicazione consumer A con worker A che elabora un flusso di dati con 4 partizioni, il worker A può detenere i lease per le partizioni 1, 2, 3 e 4 contemporaneamente. Tuttavia, se si dispone di due istanze di applicazioni consumer, A e B, con worker A e worker B, e queste istanze elaborano un flusso di dati con 4 partizioni, il worker A e il worker B non possono entrambi detenere il lease per la partizione 1 contemporaneamente. Un worker detiene il lease di una particolare partizione finché non è pronto a interrompere l'elaborazione dei record di dati della partizione o fino a quando non si verifica un guasto. Quando un worker smette di detenere il lease, un altro worker lo riprende e lo mantiene.
Per ulteriori informazioni (questi sono i KCL repository Java), vedere https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java per KCL 1.x e https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease
.java per 2.x. KCL -
Tabella di leasing: una tabella Amazon DynamoDB unica che viene utilizzata per tenere traccia degli shard KDS in un flusso di dati che vengono affittati ed elaborati dai lavoratori dell'applicazione consumer. KCL La tabella di leasing deve rimanere sincronizzata (all'interno di un worker e tra tutti i lavoratori) con le informazioni più recenti sugli shard provenienti dal flusso di dati mentre l'applicazione consumer è in esecuzione. KCL Per ulteriori informazioni, consulta Utilizzate una tabella di leasing per tenere traccia degli shard elaborati dall'applicazione consumer KCL.
-
Processore di registrazione: la logica che definisce il modo in cui l'applicazione KCL consumer elabora i dati che riceve dai flussi di dati. In fase di esecuzione, un'istanza dell'applicazione KCL consumer crea un'istanza di un worker, che crea un'istanza di un record processor per ogni shard a cui appartiene un lease.
Utilizzate una tabella di leasing per tenere traccia degli shard elaborati dall'applicazione consumer KCL
Argomenti
Cos'è una tabella di leasing
Per ogni KCL applicazione Amazon Kinesis Data Streams, utilizza una tabella di lease unica (archiviata in una tabella Amazon DynamoDB) per tenere traccia degli shard KDS in un flusso di dati che vengono affittati ed elaborati dai lavoratori dell'applicazione consumer. KCL
Importante
KCLutilizza il nome dell'applicazione consumer per creare il nome della tabella di leasing utilizzata da questa applicazione consumer, pertanto il nome di ogni applicazione consumer deve essere univoco.
È possibile visualizzare la tabella di lease utilizzando la console Amazon DynamoDB mentre l'applicazione è in esecuzione.
Se la tabella di leasing per l'applicazione KCL consumer non esiste all'avvio dell'applicazione, uno dei lavoratori crea la tabella di leasing per questa applicazione.
Importante
Il tuo account sarà addebitato per i costi associati alla tabella DynamoDB, oltre ai costi associati al flusso di dati Kinesis stesso.
Ogni riga della tabella di lease rappresenta una partizione che viene elaborata dalla tua applicazione consumer. Se l'applicazione KCL consumer elabora solo un flusso di dati, la leaseKey
chiave hash per la tabella di leasing è lo shard ID. Se lo seiElabora più flussi di dati con la stessa applicazione consumer KCL 2.x for Java, allora la struttura di è leaseKey simile a questa:. account-id:StreamName:streamCreationTimestamp:ShardId
Ad esempio 111111111:multiStreamTest-1:12345:shardId-000000000336
.
Oltre all'ID dello shard, ogni riga include anche i seguenti dati:
-
checkpoint: il numero di sequenza di checkpoint più recente per lo shard. Questo valore è univoco per tutte le partizioni nel flusso di dati.
-
checkpointSubSequenceNumero: quando si utilizza la funzione di aggregazione della Kinesis Producer Library, si tratta di un'estensione del checkpoint che tiene traccia dei record dei singoli utenti all'interno del record Kinesis.
-
leaseCounter: Utilizzato per il controllo delle versioni del leasing in modo che i lavoratori possano rilevare che il contratto di locazione è stato preso da un altro lavoratore.
-
leaseKey: un identificatore univoco per un contratto di locazione. Ogni lease è specifico di una partizione nel flusso di dati ed è detenuto da un worker alla volta.
-
leaseOwner: Il lavoratore titolare del contratto di locazione.
-
ownerSwitchesSinceCheckpoint: quante volte questo contratto di locazione ha cambiato lavoratori dall'ultima volta che è stato scritto un checkpoint.
-
parentShardId: Utilizzato per garantire che lo shard principale sia completamente elaborato prima che inizi l'elaborazione sui frammenti secondari. In questo modo, ci si assicura che i record siano elaborati nello stesso ordine in cui sono stati introdotti nel flusso.
-
hashrange: utilizzato dalla
PeriodicShardSyncManager
per eseguire sincronizzazioni periodiche per trovare le partizioni mancanti nella tabella di lease e creare lease per esse, se necessario.Nota
Questi dati sono presenti nella tabella di lease per ogni shard a partire da 1.14 e 2.3. KCL KCL Per ulteriori informazioni su
PeriodicShardSyncManager
e sulla sincronizzazione periodica tra lease e partizioni, consulta Come viene sincronizzata una tabella di leasing con gli shard in un flusso di dati Kinesis. -
childshards: utilizzato da
LeaseCleanupManager
per esaminare lo stato di elaborazione della partizione secondaria e decidere se la partizione principale può essere eliminata dalla tabella di lease.Nota
Questi dati sono presenti nella tabella di leasing per ogni shard a partire da 1.14 e 2.3. KCL KCL
-
shardID: l'ID della partizione.
Nota
Questi dati sono presenti nella tabella dei lease solo se sei Elabora più flussi di dati con la stessa applicazione consumer KCL 2.x for Java. Questa funzionalità è supportata solo nella versione KCL 2.x per Java, a partire dalla versione KCL 2.3 per Java e versioni successive.
-
nome del flusso: l'identificatore del flusso di dati nel seguente formato:
account-id:StreamName:streamCreationTimestamp
.Nota
Questi dati sono presenti nella tabella dei lease solo se sei Elabora più flussi di dati con la stessa applicazione consumer KCL 2.x for Java. Questa funzionalità è supportata solo nella versione KCL 2.x per Java, a partire dalla versione KCL 2.3 per Java e versioni successive.
Prestazioni
Se l'applicazione Flusso di dati Amazon Kinesis riceve eccezioni di velocità di trasmissione effettiva assegnata, dovrai aumentare la velocità di trasmissione effettiva assegnata per la tabella DynamoDB. La tabella KCL viene creata con un throughput assegnato di 10 letture al secondo e 10 scritture al secondo, ma questo potrebbe non essere sufficiente per l'applicazione in uso. Ad esempio, se la tua applicazione Flusso di dati Amazon Kinesis crea frequentemente dei checkpoint o opera in un flusso che è composto da molte partizioni, potrebbe essere necessaria una velocità di trasmissione effettiva maggiore.
Per informazioni sulla velocità di trasmissione effettiva assegnata in DynamoDB, consulta Modalità capacità di lettura/scrittura e Utilizzo di tabelle e dati nella Guida per gli sviluppatori di Amazon DynamoDB.
Come viene sincronizzata una tabella di leasing con gli shard in un flusso di dati Kinesis
Gli addetti alle applicazioni KCL destinate ai consumatori utilizzano i leasing per elaborare gli shard provenienti da un determinato flusso di dati. Le informazioni su quale worker sta eseguendo il lease di una partizione in un dato momento vengono archiviate in una tabella di lease. La tabella di leasing deve rimanere sincronizzata con le informazioni più recenti sugli shard provenienti dal flusso di dati mentre l'applicazione KCL consumer è in esecuzione. KCLsincronizza la tabella di lease con le informazioni sugli shard acquisite dal servizio Kinesis Data Streams durante l'avvio dell'applicazione consumer (quando l'applicazione consumer viene inizializzata o riavviata) e anche ogni volta che uno shard in fase di elaborazione raggiunge il termine (resharding). In altre parole, i worker o un'applicazione KCL consumer vengono sincronizzati con il flusso di dati che stanno elaborando durante il bootstrap iniziale dell'applicazione consumer e ogni volta che l'applicazione consumer incontra un evento di reshard del flusso di dati.
Argomenti
Sincronizzazione in versione 1.0 - 1.13 e 2.0 KCL - 2.2 KCL
Nelle versioni KCL 1.0 - 1.13 e KCL 2.0 - 2.2, durante l'avvio dell'applicazione consumer e anche durante ogni evento di reshard del flusso di dati, KCL sincronizza la tabella di lease con le informazioni sugli shard acquisite dal servizio Kinesis Data Streams richiamando o il discovery. ListShards
DescribeStream
APIs In tutte le KCL versioni sopra elencate, ogni worker di un'applicazione KCL consumer completa i seguenti passaggi per eseguire il processo di sincronizzazione lease/shard durante l'avvio dell'applicazione consumer e in occasione di ogni evento stream reshard:
-
Recupera tutte le partizioni per i dati elaborati dal flusso
-
Recupera tutte i lease delle partizioni dalla tabella di lease
-
Filtra ogni partizione aperta che non ha un lease nella tabella di lease
-
Itera su tutte le partizioni aperte trovate e per ogni partizione aperta senza un elemento principale aperto:
-
Attraversa l'albero gerarchico lungo il percorso dei suoi antenati per determinare se la partizione è un discendente. Una partizione è considerata un discendente se una partizione antenata è in fase di elaborazione (la voce di lease relativa alla partizione antenata esiste nella tabella di lease) o se è necessario elaborare una partizione antenata (ad esempio, se la posizione iniziale è
TRIM_HORIZON
oAT_TIMESTAMP
) -
Se lo shard aperto nel contesto è un discendente, KCL controlla lo shard in base alla posizione iniziale e crea dei leasing per i relativi genitori, se necessario
-
Sincronizzazione in 2.x, a KCL partire dalla versione 2.3 e successive KCL
A partire dalle ultime versioni supportate di KCL 2.x (KCL2.3) e successive, la libreria ora supporta le seguenti modifiche al processo di sincronizzazione. Queste modifiche alla sincronizzazione lease/shard riducono significativamente il numero di API chiamate effettuate dalle applicazioni KCL consumer al servizio Kinesis Data Streams e ottimizzano la gestione del leasing nell'applicazione consumer. KCL
-
Durante l'avvio dell'applicazione, se la tabella di lease è vuota, KCL utilizza l'opzione di filtro
ListShard
API dell'applicazione (il parametro di richiestaShardFilter
opzionale) per recuperare e creare leasing solo per un'istantanea degli shard aperti nel momento specificato dal parametro.ShardFilter
IlShardFilter
parametro consente di filtrare la risposta di.ListShards
API L'unica proprietà richiesta del parametroShardFilter
èType
. KCLutilizza la proprietàType
filter e i seguenti valori validi per identificare e restituire un'istantanea degli shard aperti che potrebbero richiedere nuovi leasing:-
AT_TRIM_HORIZON
: la risposta include tutte le partizioni che erano aperte inTRIM_HORIZON
. -
AT_LATEST
: la risposta include solo le partizioni del flusso di dati correntemente aperte. -
AT_TIMESTAMP
: la risposta include tutte le partizioni il cui timestamp di inizio è inferiore o uguale al timestamp specificato e il timestamp di fine è maggiore o uguale al timestamp specificato o sono ancora aperte.
ShardFilter
viene utilizzato durante la creazione di lease per una tabella di lease vuota per inizializzare i lease per uno snapshot delle partizioni specificate inRetrievalConfig#initialPositionInStreamExtended
.Per ulteriori informazioni su
ShardFilter
, consulta https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html. -
-
Invece che tutti i lavoratori eseguano la sincronizzazione. lease/shard synchronization to keep the lease table up to date with the latest shards in the data stream, a single elected worker leader performs the lease/shard
-
KCL2.3 utilizza il parametro
ChildShards
return diGetRecords
and theSubscribeToShard
APIs per eseguire la sincronizzazione lease/shard che avviene per i frammenti chiusi, consentendo a un KCL lavoratore di creare leasing soloSHARD_END
per i frammenti secondari dello shard che ha terminato l'elaborazione. Per la condivisione tra le applicazioni consumer, questa ottimizzazione della sincronizzazione lease/shard utilizza il parametro di.ChildShards
GetRecords
API Per le applicazioni consumer con throughput dedicato (fan-out avanzato), questa ottimizzazione della sincronizzazione lease/shard utilizza il parametro di.ChildShards
SubscribeToShard
API Per ulteriori informazioni, vedere, e. GetRecordsSubscribeToShardsChildShard -
Con le modifiche di cui sopra, il comportamento di KCL sta passando dal modello di tutti i lavoratori che imparano a conoscere tutti i frammenti esistenti al modello dei lavoratori che apprendono solo i bambini, frammenti dei frammenti di frammenti di proprietà di ciascun lavoratore. Pertanto, oltre alla sincronizzazione che avviene durante gli eventi di bootstraping e reshard delle applicazioni consumer, KCL ora esegue anche ulteriori scansioni periodiche degli shard/lease per identificare eventuali potenziali buchi nella tabella di lease (in altre parole, per conoscere tutti i nuovi shard) per garantire l'elaborazione dell'intervallo hash completo del flusso di dati e creare contratti di leasing, se necessario.
PeriodicShardSyncManager
è il componente responsabile dell'esecuzione di scansioni periodiche di lease/shard.Nella versione KCL 2.3, sono disponibili nuove opzioni di configurazione da configurare in:
PeriodicShardSyncManager
LeaseManagementConfig
Nome Valore predefinito Descrizione leasesRecoveryAuditorExecutionFrequencyMillis 120.000 (2 minuti)
La frequenza (in millisecondi) del lavoro del revisore per la ricerca di lease parziali nella tabella di lease. Se il revisore rileva un buco nei lease relativi a uno stream, attiva la sincronizzazione delle partizioni in base a
leasesRecoveryAuditorInconsistencyConfidenceThreshold
.leasesRecoveryAuditorInconsistencyConfidenceThreshold 3
Soglia di confidenza per il lavoro periodico del revisore volto a determinare se i lease per un flusso di dati nella tabella di lease non sono coerenti. Se il revisore rileva lo stesso insieme di incongruenze consecutivamente per un flusso di dati per questo numero di volte, attiva una sincronizzazione delle partizioni.
Ora vengono inoltre emesse nuove CloudWatch metriche per monitorare lo stato di.
PeriodicShardSyncManager
Per ulteriori informazioni, consulta PeriodicShardSyncManager. -
Inclusa un'ottimizzazione a
HierarchicalShardSyncer
per creare lease solo per un livello di partizioni.
Sincronizzazione in KCL 1.x, a partire dalla versione 1.14 e successive KCL
A partire dalle ultime versioni supportate di KCL 1.x (KCL1.14) e successive, la libreria ora supporta le seguenti modifiche al processo di sincronizzazione. Queste modifiche alla sincronizzazione lease/shard riducono significativamente il numero di API chiamate effettuate dalle applicazioni KCL consumer al servizio Kinesis Data Streams e ottimizzano la gestione del leasing nell'applicazione consumer. KCL
-
Durante l'avvio dell'applicazione, se la tabella di lease è vuota, KCL utilizza l'opzione di filtro
ListShard
API dell'applicazione (il parametro di richiestaShardFilter
opzionale) per recuperare e creare leasing solo per un'istantanea degli shard aperti nel momento specificato dal parametro.ShardFilter
IlShardFilter
parametro consente di filtrare la risposta di.ListShards
API L'unica proprietà richiesta del parametroShardFilter
èType
. KCLutilizza la proprietàType
filter e i seguenti valori validi per identificare e restituire un'istantanea degli shard aperti che potrebbero richiedere nuovi leasing:-
AT_TRIM_HORIZON
: la risposta include tutte le partizioni che erano aperte inTRIM_HORIZON
. -
AT_LATEST
: la risposta include solo le partizioni del flusso di dati correntemente aperte. -
AT_TIMESTAMP
: la risposta include tutte le partizioni il cui timestamp di inizio è inferiore o uguale al timestamp specificato e il timestamp di fine è maggiore o uguale al timestamp specificato o sono ancora aperte.
ShardFilter
viene utilizzato durante la creazione di lease per una tabella di lease vuota per inizializzare i lease per uno snapshot delle partizioni specificate inKinesisClientLibConfiguration#initialPositionInStreamExtended
.Per ulteriori informazioni su
ShardFilter
, consulta https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html. -
-
Invece che tutti i lavoratori eseguano la sincronizzazione. lease/shard synchronization to keep the lease table up to date with the latest shards in the data stream, a single elected worker leader performs the lease/shard
-
KCL1.14 utilizza il parametro
ChildShards
return diGetRecords
and theSubscribeToShard
APIs per eseguire la sincronizzazione lease/shard che avviene per i frammenti chiusi, consentendo a un KCL lavoratore di creare leasing soloSHARD_END
per i frammenti secondari dello shard che ha terminato l'elaborazione. Per ulteriori informazioni, vedere e. GetRecordsChildShard -
Con le modifiche di cui sopra, il comportamento di KCL sta passando dal modello di tutti i lavoratori che imparano a conoscere tutti i frammenti esistenti al modello dei lavoratori che apprendono solo i bambini, frammenti dei frammenti di frammenti di proprietà di ciascun lavoratore. Pertanto, oltre alla sincronizzazione che avviene durante gli eventi di bootstraping e reshard delle applicazioni consumer, KCL ora esegue anche ulteriori scansioni periodiche degli shard/lease per identificare eventuali potenziali buchi nella tabella di lease (in altre parole, per conoscere tutti i nuovi shard) per garantire l'elaborazione dell'intervallo hash completo del flusso di dati e creare contratti di leasing, se necessario.
PeriodicShardSyncManager
è il componente responsabile dell'esecuzione di scansioni periodiche di lease/shard.Quando
KinesisClientLibConfiguration#shardSyncStrategyType
è impostato suShardSyncStrategyType.SHARD_END
,PeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThreshold
viene utilizzato per determinare la soglia per il numero di scansioni consecutive contenenti buchi nella tabella di lease, dopodiché imporre la sincronizzazione delle partizioni. QuandoKinesisClientLibConfiguration#shardSyncStrategyType
è impostato suShardSyncStrategyType.PERIODIC
,leasesRecoveryAuditorInconsistencyConfidenceThreshold
viene ignorato.Nella versione KCL 1.14, è disponibile una nuova opzione di configurazione da configurare in:
PeriodicShardSyncManager
LeaseManagementConfig
Nome Valore predefinito Descrizione leasesRecoveryAuditorInconsistencyConfidenceThreshold 3
Soglia di confidenza per il lavoro periodico del revisore volto a determinare se i lease per un flusso di dati nella tabella di lease non sono coerenti. Se il revisore rileva lo stesso insieme di incongruenze consecutivamente per un flusso di dati per questo numero di volte, attiva una sincronizzazione delle partizioni.
Ora vengono inoltre emesse nuove CloudWatch metriche per monitorare lo stato di.
PeriodicShardSyncManager
Per ulteriori informazioni, consulta PeriodicShardSyncManager. -
KCL1.14 ora supporta anche la pulizia del leasing differito. I lease vengono eliminati in modo asincrono da
LeaseCleanupManager
quando viene raggiuntoSHARD_END
o quando una partizione è scaduta, superando il periodo di conservazione del flusso di dati o è stata chiusa a seguito di un'operazione di ripartizionamento.Sono disponibili nuove opzioni di configurazione per configurare
LeaseCleanupManager
.Nome Valore predefinito Descrizione leaseCleanupIntervalMillis 1 minuto
Intervallo in cui eseguire il thread di pulizia dei lease.
completedLeaseCleanupIntervalMillis 5 minuti Intervallo in cui verificare se un lease è stato completato o meno.
garbageLeaseCleanupIntervalMillis 30 minutes Intervallo durante il quale verificare se un lease è inutile (ossia se è stato interrotto oltre il periodo di conservazione del flusso di dati) o meno.
-
Inclusa un'ottimizzazione a
KinesisShardSyncer
per creare lease solo per un livello di partizioni.
Elabora più flussi di dati con la stessa applicazione consumer KCL 2.x for Java
Questa sezione descrive le seguenti modifiche alla versione KCL 2.x per Java che consentono di creare applicazioni KCL consumer in grado di elaborare più di un flusso di dati contemporaneamente.
Importante
L'elaborazione multistream è supportata solo nella versione KCL 2.x per Java, a partire dalla versione KCL 2.3 per Java e versioni successive.
L'elaborazione multistream è NOT supportata per tutti gli altri linguaggi in cui KCL è possibile implementare 2.x.
L'elaborazione multistream è NOT supportata in tutte le versioni di 1.x. KCL
-
MultistreamTracker interfaccia
Per creare un'applicazione consumer in grado di elaborare più flussi contemporaneamente, è necessario implementare una nuova interfaccia denominata MultistreamTracker
. Questa interfaccia include il streamConfigList
metodo che restituisce l'elenco dei flussi di dati e le relative configurazioni che devono essere elaborati dall'KCLapplicazione consumer. Si noti che i flussi di dati in fase di elaborazione possono essere modificati durante il runtime dell'applicazione consumer.streamConfigList
viene chiamato periodicamente dal KCL per conoscere le modifiche nei flussi di dati da elaborare.Il
streamConfigList
metodo compila l'elenco. StreamConfigpackage software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }
Nota che
StreamIdentifier
eInitialPositionInStreamExtended
sono obbligatori, mentreconsumerArn
è facoltativo. È necessario fornire il comandoconsumerArn
solo se si utilizza la versione KCL 2.x per implementare un'applicazione consumer fan-out avanzata.Per ulteriori informazioni su
StreamIdentifier
, vedere https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L129. Per creare un StreamIdentifier
, ti consigliamo di creare un'istanza multistream dastreamArn
and the disponibile nella versionestreamCreationEpoch
2.5.0 e successive. Nelle KCL versioni 2.3 e v2.4, che non supportanostreamArm
, crea un'istanza multistream utilizzando il formato.account-id:StreamName:streamCreationTimestamp
Questo formato sarà obsoleto e non sarà più supportato a partire dalla prossima versione principale.MultistreamTracker
include anche una strategia per eliminare i lease di vecchi flussi nella tabella dei lease (formerStreamsLeasesDeletionStrategy
). Si noti che la strategia viene CANNOT modificata durante il runtime dell'applicazione consumer. Per ulteriori informazioni, consultate https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b//amazon-kinesis-client.java src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy -
ConfigsBuilder
è una classe a livello di applicazione che è possibile utilizzare per specificare tutte le impostazioni di configurazione 2.x da utilizzare durante la creazione di un'applicazione consumer. KCL KCL ConfigsBuilder
la classe ora supporta l'interfaccia.MultistreamTracker
Puoi inizializzarli ConfigsBuilder entrambi con il nome dell'unico flusso di dati da cui consumare i record da:/** * Constructor to initialize ConfigsBuilder with StreamName * @param streamName * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.right(streamName); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
Oppure puoi inizializzare ConfigsBuilder con
MultiStreamTracker
se desideri implementare un'applicazione KCL consumer che elabori più flussi contemporaneamente.* Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
-
Con il supporto multistream implementato per l'applicazione KCL consumer, ogni riga della tabella di lease dell'applicazione ora contiene l'ID dello shard e il nome del flusso dei molteplici flussi di dati elaborati dall'applicazione.
-
Quando viene implementato il supporto multistream per la tua applicazione KCL consumer, assume la seguente struttura:. leaseKey
account-id:StreamName:streamCreationTimestamp:ShardId
Ad esempio111111111:multiStreamTest-1:12345:shardId-000000000336
.Importante
Quando la tua applicazione KCL consumer esistente è configurata per elaborare solo un flusso di dati, leaseKey (che è la chiave hash per la tabella di leasing) è lo shard ID. Se riconfigurate questa applicazione KCL consumer esistente per elaborare più flussi di dati, si interrompe la tabella di lease, perché con il supporto multistream, la struttura deve essere la seguente:. leaseKey
account-id:StreamName:StreamCreationTimestamp:ShardId
Usa il registro con lo schema KCL AWS Glue
Puoi integrare i tuoi flussi di dati Kinesis con lo Schema Registry. AWS Glue Lo AWS Glue Schema Registry ti consente di scoprire, controllare ed evolvere centralmente gli schemi, garantendo al contempo che i dati prodotti siano convalidati continuamente da uno schema registrato. Uno schema definisce la struttura e il formato di un registro di dati. Uno schema è una specifica con versioni per la pubblicazione, il consumo o l'archiviazione dei dati in modo affidabile. Lo AWS Glue Schema Registry consente di migliorare la qualità e la governance end-to-end dei dati all'interno delle applicazioni di streaming. Per ulteriori informazioni, consulta Registro degli schemi di AWS Glue. Uno dei modi per configurare questa integrazione è tramite Java. KCL
Importante
Attualmente, l'integrazione tra Kinesis Data AWS Glue Streams e Schema Registry è supportata solo per i flussi di dati Kinesis KCL che utilizzano utenti 2.3 implementati in Java. Il supporto multilingue non viene fornito. KCLI consumatori 1.0 non sono supportati. KCLI consumatori 2.x precedenti alla KCL 2.3 non sono supportati.
Per istruzioni dettagliate su come configurare l'integrazione di Kinesis Data Streams con Schema Registry KCL utilizzando il, consulta la sezione «Interazione con i dati KPL utilizzando KCL le librerie/» in Caso d'uso: integrazione di Amazon Kinesis Data Streams con il registro dello schema Glue. AWS