Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Esegui la migrazione dei consumatori da 1.x a 2.x KCL KCL
Nota
Le versioni 1.x e 2.x di Kinesis Client Library (KCL) sono obsolete. Consigliamo di effettuare la migrazione alla KCLversione 3.x, che offre prestazioni migliorate e nuove funzionalità. Per la KCL documentazione e la guida alla migrazione più recenti, consulta. Usa la libreria client Kinesis
Questo argomento spiega le differenze tra le versioni 1.x e 2.x della Kinesis Client Library (). KCL Mostra anche come migrare il tuo utente dalla versione 1.x alla versione 2.x di. KCL Dopo la migrazione, il client avvierà l'elaborazione dei record dall'ultimo punto di controllo verificato.
La versione 2.0 di KCL introduce le seguenti modifiche all'interfaccia:
KCLInterfaccia 1.x | KCLInterfaccia 2.0 |
---|---|
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor |
software.amazon.kinesis.processor.ShardRecordProcessor |
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory |
software.amazon.kinesis.processor.ShardRecordProcessorFactory |
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware |
Piegata in software.amazon.kinesis.processor.ShardRecordProcessor |
Argomenti
Migrazione del processore di registrazione
L'esempio seguente mostra un processore di registrazione implementato per KCL 1.x:
package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { // // Process records, and possibly checkpoint // } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
Per migrare la classe dell'elaboratore di record
-
Modifica le interfacce da
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor
ecom.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware
versosoftware.amazon.kinesis.processor.ShardRecordProcessor
, come segue:// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import software.amazon.kinesis.processor.ShardRecordProcessor; // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { public class TestRecordProcessor implements ShardRecordProcessor {
-
Aggiorna le istruzioni
import
per i metodiinitialize
eprocessRecords
.// import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
-
Sostituisci il metodo
shutdown
con i seguenti nuovi metodi:leaseLost
,shardEnded
eshutdownRequested
.// @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }
Segue la versione aggiornata della classe dell'elaboratore di record.
package com.amazonaws.kcl; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.ShardRecordProcessor; public class TestRecordProcessor implements ShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }
Migrare la fabbrica del processore di registrazione
La fabbrica dell'elaboratore di record è responsabile per la creazione di elaboratori di record quando un lease è acquisito. Di seguito è riportato un esempio di fabbrica KCL 1.x.
package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class TestRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new TestRecordProcessor(); } }
Per migrare la fabbrica dell'elaboratore di record
-
Modifica l'interfaccia implementata da
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory
asoftware.amazon.kinesis.processor.ShardRecordProcessorFactory
, come segue.// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
-
Modifica la firma di ritorno per
createProcessor
.// public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {
Di seguito è riportato un esempio di fabbrica di elaboratore di record in 2.0:
package com.amazonaws.kcl; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class TestRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new TestRecordProcessor(); } }
Migra il lavoratore
Nella versione 2.0 diKCL, una nuova classe, chiamataScheduler
, sostituisce la Worker
classe. Di seguito è riportato un esempio di worker KCL 1.x.
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();
Per migrare il lavoratore
-
Modifica la dichiarazione
import
per la classeWorker
nelle dichiarazioni di importazione delle classiScheduler
eConfigsBuilder
.// import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
-
Crea
ConfigsBuilder
eScheduler
come mostrato nell'esempio seguente.Si consiglia di utilizzare
KinesisClientUtil
per creareKinesisAsyncClient
e configuraremaxConcurrency
inKinesisAsyncClient
.Importante
Il client Amazon Kinesis potrebbe presentare un aumento significativo della latenza, a meno che non venga configurato
KinesisAsyncClient
per avere unamaxConcurrency
sufficientemente alta per consentire tutti i canoni più ulteriori utilizzi diKinesisAsyncClient
.import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... Region region = Region.AP_NORTHEAST_2; KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region)); DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory()); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() );
Configurazione del client Amazon Kinesis
Con il rilascio 2.0 della Kinesis Client Library, la configurazione del client si è spostata da una singola classe di configurazione (KinesisClientLibConfiguration
) a sei classi di configurazione. La tabella seguente descrive la migrazione.
Campo originale | Nuova classe di configurazione | Descrizione |
---|---|---|
applicationName |
ConfigsBuilder |
Il nome di questa KCL applicazione. Utilizzato come predefinito per tableName e consumerName . |
tableName |
ConfigsBuilder |
Consente di ignorare il nome della tabella utilizzato per la tabella di lease di Amazon DynamoDB. |
streamName |
ConfigsBuilder |
Il nome del flusso dal quale l'applicazione elabora i record. |
kinesisEndpoint |
ConfigsBuilder |
Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni. |
dynamoDBEndpoint |
ConfigsBuilder |
Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni. |
initialPositionInStreamExtended |
RetrievalConfig |
La posizione nello shard da cui KCL inizia a recuperare i record, a partire dall'esecuzione iniziale dell'applicazione. |
kinesisCredentialsProvider |
ConfigsBuilder |
Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni. |
dynamoDBCredentialsProvider |
ConfigsBuilder |
Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni. |
cloudWatchCredentialsProvider |
ConfigsBuilder |
Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni. |
failoverTimeMillis |
LeaseManagementConfig | Il numero di millisecondi che devono passare prima di poter considerare un proprietario di lease come fallito. |
workerIdentifier |
ConfigsBuilder |
Un identificatore univoco che rappresenta la creazione dell'elaboratore di applicazione. Deve essere univoco. |
shardSyncIntervalMillis |
LeaseManagementConfig | Il periodo di tempo tra le chiamate di sincronizzazione dello shard. |
maxRecords |
PollingConfig |
Consente di impostare il numero massimo di record restituiti da Kinesis. |
idleTimeBetweenReadsInMillis |
CoordinatorConfig |
Questa opzione è stata eliminata. Vedi rimozione tempo di inattività. |
callProcessRecordsEvenForEmptyRecordList |
ProcessorConfig |
Quando impostato, l'elaboratore di record viene chiamato anche quando nessun record è stato fornito da . |
parentShardPollIntervalMillis |
CoordinatorConfig |
Con quale frequenza un elaboratore di record deve eseguire il polling per vedere se il shard padre è stata completato. |
cleanupLeasesUponShardCompletion |
LeaseManagementConfig |
Quando impostati, i lease vengono rimossi non appena i lease figlio hanno iniziato l'elaborazione. |
ignoreUnexpectedChildShards |
LeaseManagementConfig |
Quando impostato, i shard figlio che hanno un shard aperto vengono ignorati. Questo è principalmente per DynamoDB Streams. |
kinesisClientConfig |
ConfigsBuilder |
Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni. |
dynamoDBClientConfig |
ConfigsBuilder |
Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni. |
cloudWatchClientConfig |
ConfigsBuilder |
Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni. |
taskBackoffTimeMillis |
LifecycleConfig |
Il tempo di attesa per riprovare operazioni non riuscite. |
metricsBufferTimeMillis |
MetricsConfig |
Controlla la pubblicazione delle CloudWatch metriche. |
metricsMaxQueueSize |
MetricsConfig |
Controlla la pubblicazione CloudWatch delle metriche. |
metricsLevel |
MetricsConfig |
Controlla la pubblicazione CloudWatch delle metriche. |
metricsEnabledDimensions |
MetricsConfig |
Controlla la pubblicazione CloudWatch delle metriche. |
validateSequenceNumberBeforeCheckpointing |
CheckpointConfig |
Questa opzione è stata eliminata. Vedi la convalida del numero di sequenza del checkpoint. |
regionName |
ConfigsBuilder |
Questa opzione è stata eliminata. Vedi la rimozione della configurazione client. |
maxLeasesForWorker |
LeaseManagementConfig |
Il numero massimo di lease che una singola istanza dell'applicazione deve accettare. |
maxLeasesToStealAtOneTime |
LeaseManagementConfig |
Il numero massimo di lease che un'applicazione deve tentare di intercettare simultaneamente. |
initialLeaseTableReadCapacity |
LeaseManagementConfig |
La IOPs lettura DynamoDB utilizzata se la Kinesis Client Library deve creare una nuova tabella di lease DynamoDB. |
initialLeaseTableWriteCapacity |
LeaseManagementConfig |
La IOPs lettura DynamoDB utilizzata se la Kinesis Client Library deve creare una nuova tabella di lease DynamoDB. |
initialPositionInStreamExtended |
LeaseManagementConfig | La posizione iniziale nel flusso nella quale l'applicazione dovrebbe iniziare. Questo viene utilizzato soltanto durante la creazione del lease iniziale. |
skipShardSyncAtWorkerInitializationIfLeasesExist |
CoordinatorConfig |
Disabilita la sincronizzazione dei dati shard se la tabella di lease contiene lease esistenti. TODO: -438 KinesisEco |
shardPrioritization |
CoordinatorConfig |
Quale prioritizzazione shard utilizzare. |
shutdownGraceMillis |
N/D | Questa opzione è stata eliminata. Vedi MultiLang Traslochi. |
timeoutInSeconds |
N/D | Questa opzione è stata eliminata. Vedi MultiLang Rimozioni. |
retryGetRecordsInSeconds |
PollingConfig |
Configura il ritardo tra i GetRecords tentativi di errore. |
maxGetRecordsThreadPool |
PollingConfig |
La dimensione del pool di thread utilizzato per. GetRecords |
maxLeaseRenewalThreads |
LeaseManagementConfig |
Controlla le dimensioni del pool di thread di rinnovo del lease. Quanto maggiori sono i lease che può richiedere l'applicazione, tanto più grande deve essere questo pool. |
recordsFetcherFactory |
PollingConfig |
Consente di sostituire la factory utilizzata per creare fetcher che recuperano dai flussi. |
logWarningForTaskAfterMillis |
LifecycleConfig |
Quanto tempo bisogna attendere prima che venga registrato un avviso se un'attività non è stata completata. |
listShardsBackoffTimeInMillis |
RetrievalConfig |
Il numero di millisecondi di attesa tra le chiamate in ListShards quando si verificano errori. |
maxListShardsRetryAttempts |
RetrievalConfig |
Il numero massimo di volte che ListShards effettua nuovi tentativi prima di desistere. |
Rimozione dei tempi di inattività
Nella versione 1.x diKCL, idleTimeBetweenReadsInMillis
corrispondeva a due quantità:
-
La quantità di tempo tra controlli dell'attività. È ora possibile configurare questo periodo tra attività impostando
CoordinatorConfig#shardConsumerDispatchPollIntervalMillis
. -
La quantità di tempo di sospensione quando non è stato restituito alcun record da . Nella versione 2.0, nel fan-out ottimizzato, i record vengono inviati da chi li ha recuperati. L'attività sul consumo di shard avviene solo quando arriva una richiesta di push.
Rimozioni della configurazione del client
Nella versione 2.0, KCL non crea più client. Spetta all'utente fornire un client valido. Con questa modifica, tutti i parametri di configurazione che controllavano la creazione del client sono stati rimossi. Se hai bisogno di questi parametri, puoi impostarli sui client prima di fornire i client a ConfigsBuilder
.
Campo rimosso | Configurazione equivalente |
---|---|
kinesisEndpoint |
Configura il SDK KinesisAsyncClient con l'endpoint preferito:KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis
endpoint>")).build() . |
dynamoDBEndpoint |
Configura il SDK DynamoDbAsyncClient con endpoint preferito:. DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb
endpoint>")).build() |
kinesisClientConfig |
Configura SDK KinesisAsyncClient con la configurazione necessaria:KinesisAsyncClient.builder().overrideConfiguration(<your
configuration>).build() . |
dynamoDBClientConfig |
Configura SDK DynamoDbAsyncClient con la configurazione necessaria:DynamoDbAsyncClient.builder().overrideConfiguration(<your
configuration>).build() . |
cloudWatchClientConfig |
Configura SDK CloudWatchAsyncClient con la configurazione necessaria:CloudWatchAsyncClient.builder().overrideConfiguration(<your
configuration>).build() . |
regionName |
Configura SDK con la regione preferita. È lo stesso per tutti i SDK client. Ad esempio KinesisAsyncClient.builder().region(Region.US_WEST_2).build() . |