Esegui la migrazione dei consumatori da 1.x a 2.x KCL KCL - Flusso di dati Amazon Kinesis

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Esegui la migrazione dei consumatori da 1.x a 2.x KCL KCL

Nota

Le versioni 1.x e 2.x di Kinesis Client Library (KCL) sono obsolete. Consigliamo di effettuare la migrazione alla KCLversione 3.x, che offre prestazioni migliorate e nuove funzionalità. Per la KCL documentazione e la guida alla migrazione più recenti, consulta. Usa la libreria client Kinesis

Questo argomento spiega le differenze tra le versioni 1.x e 2.x della Kinesis Client Library (). KCL Mostra anche come migrare il tuo utente dalla versione 1.x alla versione 2.x di. KCL Dopo la migrazione, il client avvierà l'elaborazione dei record dall'ultimo punto di controllo verificato.

La versione 2.0 di KCL introduce le seguenti modifiche all'interfaccia:

KCLModifiche all'interfaccia
KCLInterfaccia 1.x KCLInterfaccia 2.0
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor software.amazon.kinesis.processor.ShardRecordProcessor
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory software.amazon.kinesis.processor.ShardRecordProcessorFactory
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware Piegata in software.amazon.kinesis.processor.ShardRecordProcessor

Migrazione del processore di registrazione

L'esempio seguente mostra un processore di registrazione implementato per KCL 1.x:

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { // // Process records, and possibly checkpoint // } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
Per migrare la classe dell'elaboratore di record
  1. Modifica le interfacce da com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor e com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware verso software.amazon.kinesis.processor.ShardRecordProcessor, come segue:

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import software.amazon.kinesis.processor.ShardRecordProcessor; // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { public class TestRecordProcessor implements ShardRecordProcessor {
  2. Aggiorna le istruzioni import per i metodi initialize e processRecords.

    // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
  3. Sostituisci il metodo shutdown con i seguenti nuovi metodi: leaseLost, shardEnded e shutdownRequested.

    // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }

Segue la versione aggiornata della classe dell'elaboratore di record.

package com.amazonaws.kcl; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.ShardRecordProcessor; public class TestRecordProcessor implements ShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }

Migrare la fabbrica del processore di registrazione

La fabbrica dell'elaboratore di record è responsabile per la creazione di elaboratori di record quando un lease è acquisito. Di seguito è riportato un esempio di fabbrica KCL 1.x.

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class TestRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new TestRecordProcessor(); } }
Per migrare la fabbrica dell'elaboratore di record
  1. Modifica l'interfaccia implementata da com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory a software.amazon.kinesis.processor.ShardRecordProcessorFactory, come segue.

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
  2. Modifica la firma di ritorno per createProcessor.

    // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {

Di seguito è riportato un esempio di fabbrica di elaboratore di record in 2.0:

package com.amazonaws.kcl; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class TestRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new TestRecordProcessor(); } }

Migra il lavoratore

Nella versione 2.0 diKCL, una nuova classe, chiamataScheduler, sostituisce la Worker classe. Di seguito è riportato un esempio di worker KCL 1.x.

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();
Per migrare il lavoratore
  1. Modifica la dichiarazione import per la classe Worker nelle dichiarazioni di importazione delle classi Scheduler e ConfigsBuilder.

    // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
  2. Crea ConfigsBuilder e Scheduler come mostrato nell'esempio seguente.

    Si consiglia di utilizzare KinesisClientUtil per creare KinesisAsyncClient e configurare maxConcurrency in KinesisAsyncClient.

    Importante

    Il client Amazon Kinesis potrebbe presentare un aumento significativo della latenza, a meno che non venga configurato KinesisAsyncClient per avere una maxConcurrency sufficientemente alta per consentire tutti i canoni più ulteriori utilizzi di KinesisAsyncClient.

    import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... Region region = Region.AP_NORTHEAST_2; KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region)); DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory()); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() );

Configurazione del client Amazon Kinesis

Con il rilascio 2.0 della Kinesis Client Library, la configurazione del client si è spostata da una singola classe di configurazione (KinesisClientLibConfiguration) a sei classi di configurazione. La tabella seguente descrive la migrazione.

Campi di configurazione e relative nuove classi
Campo originale Nuova classe di configurazione Descrizione
applicationName ConfigsBuilder Il nome di questa KCL applicazione. Utilizzato come predefinito per tableName e consumerName.
tableName ConfigsBuilder Consente di ignorare il nome della tabella utilizzato per la tabella di lease di Amazon DynamoDB.
streamName ConfigsBuilder Il nome del flusso dal quale l'applicazione elabora i record.
kinesisEndpoint ConfigsBuilder Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni.
dynamoDBEndpoint ConfigsBuilder Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni.
initialPositionInStreamExtended RetrievalConfig La posizione nello shard da cui KCL inizia a recuperare i record, a partire dall'esecuzione iniziale dell'applicazione.
kinesisCredentialsProvider ConfigsBuilder Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni.
dynamoDBCredentialsProvider ConfigsBuilder Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni.
cloudWatchCredentialsProvider ConfigsBuilder Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni.
failoverTimeMillis LeaseManagementConfig Il numero di millisecondi che devono passare prima di poter considerare un proprietario di lease come fallito.
workerIdentifier ConfigsBuilder Un identificatore univoco che rappresenta la creazione dell'elaboratore di applicazione. Deve essere univoco.
shardSyncIntervalMillis LeaseManagementConfig Il periodo di tempo tra le chiamate di sincronizzazione dello shard.
maxRecords PollingConfig Consente di impostare il numero massimo di record restituiti da Kinesis.
idleTimeBetweenReadsInMillis CoordinatorConfig Questa opzione è stata eliminata. Vedi rimozione tempo di inattività.
callProcessRecordsEvenForEmptyRecordList ProcessorConfig Quando impostato, l'elaboratore di record viene chiamato anche quando nessun record è stato fornito da .
parentShardPollIntervalMillis CoordinatorConfig Con quale frequenza un elaboratore di record deve eseguire il polling per vedere se il shard padre è stata completato.
cleanupLeasesUponShardCompletion LeaseManagementConfig Quando impostati, i lease vengono rimossi non appena i lease figlio hanno iniziato l'elaborazione.
ignoreUnexpectedChildShards LeaseManagementConfig Quando impostato, i shard figlio che hanno un shard aperto vengono ignorati. Questo è principalmente per DynamoDB Streams.
kinesisClientConfig ConfigsBuilder Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni.
dynamoDBClientConfig ConfigsBuilder Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni.
cloudWatchClientConfig ConfigsBuilder Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni.
taskBackoffTimeMillis LifecycleConfig Il tempo di attesa per riprovare operazioni non riuscite.
metricsBufferTimeMillis MetricsConfig Controlla la pubblicazione delle CloudWatch metriche.
metricsMaxQueueSize MetricsConfig Controlla la pubblicazione CloudWatch delle metriche.
metricsLevel MetricsConfig Controlla la pubblicazione CloudWatch delle metriche.
metricsEnabledDimensions MetricsConfig Controlla la pubblicazione CloudWatch delle metriche.
validateSequenceNumberBeforeCheckpointing CheckpointConfig Questa opzione è stata eliminata. Vedi la convalida del numero di sequenza del checkpoint.
regionName ConfigsBuilder Questa opzione è stata eliminata. Vedi la rimozione della configurazione client.
maxLeasesForWorker LeaseManagementConfig Il numero massimo di lease che una singola istanza dell'applicazione deve accettare.
maxLeasesToStealAtOneTime LeaseManagementConfig Il numero massimo di lease che un'applicazione deve tentare di intercettare simultaneamente.
initialLeaseTableReadCapacity LeaseManagementConfig La IOPs lettura DynamoDB utilizzata se la Kinesis Client Library deve creare una nuova tabella di lease DynamoDB.
initialLeaseTableWriteCapacity LeaseManagementConfig La IOPs lettura DynamoDB utilizzata se la Kinesis Client Library deve creare una nuova tabella di lease DynamoDB.
initialPositionInStreamExtended LeaseManagementConfig La posizione iniziale nel flusso nella quale l'applicazione dovrebbe iniziare. Questo viene utilizzato soltanto durante la creazione del lease iniziale.
skipShardSyncAtWorkerInitializationIfLeasesExist CoordinatorConfig Disabilita la sincronizzazione dei dati shard se la tabella di lease contiene lease esistenti. TODO: -438 KinesisEco
shardPrioritization CoordinatorConfig Quale prioritizzazione shard utilizzare.
shutdownGraceMillis N/D Questa opzione è stata eliminata. Vedi MultiLang Traslochi.
timeoutInSeconds N/D Questa opzione è stata eliminata. Vedi MultiLang Rimozioni.
retryGetRecordsInSeconds PollingConfig Configura il ritardo tra i GetRecords tentativi di errore.
maxGetRecordsThreadPool PollingConfig La dimensione del pool di thread utilizzato per. GetRecords
maxLeaseRenewalThreads LeaseManagementConfig Controlla le dimensioni del pool di thread di rinnovo del lease. Quanto maggiori sono i lease che può richiedere l'applicazione, tanto più grande deve essere questo pool.
recordsFetcherFactory PollingConfig Consente di sostituire la factory utilizzata per creare fetcher che recuperano dai flussi.
logWarningForTaskAfterMillis LifecycleConfig Quanto tempo bisogna attendere prima che venga registrato un avviso se un'attività non è stata completata.
listShardsBackoffTimeInMillis RetrievalConfig Il numero di millisecondi di attesa tra le chiamate in ListShards quando si verificano errori.
maxListShardsRetryAttempts RetrievalConfig Il numero massimo di volte che ListShards effettua nuovi tentativi prima di desistere.

Rimozione dei tempi di inattività

Nella versione 1.x diKCL, idleTimeBetweenReadsInMillis corrispondeva a due quantità:

  • La quantità di tempo tra controlli dell'attività. È ora possibile configurare questo periodo tra attività impostando CoordinatorConfig#shardConsumerDispatchPollIntervalMillis.

  • La quantità di tempo di sospensione quando non è stato restituito alcun record da . Nella versione 2.0, nel fan-out ottimizzato, i record vengono inviati da chi li ha recuperati. L'attività sul consumo di shard avviene solo quando arriva una richiesta di push.

Rimozioni della configurazione del client

Nella versione 2.0, KCL non crea più client. Spetta all'utente fornire un client valido. Con questa modifica, tutti i parametri di configurazione che controllavano la creazione del client sono stati rimossi. Se hai bisogno di questi parametri, puoi impostarli sui client prima di fornire i client a ConfigsBuilder.

Campo rimosso Configurazione equivalente
kinesisEndpoint Configura il SDK KinesisAsyncClient con l'endpoint preferito:KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis endpoint>")).build().
dynamoDBEndpoint Configura il SDK DynamoDbAsyncClient con endpoint preferito:. DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb endpoint>")).build()
kinesisClientConfig Configura SDK KinesisAsyncClient con la configurazione necessaria:KinesisAsyncClient.builder().overrideConfiguration(<your configuration>).build().
dynamoDBClientConfig Configura SDK DynamoDbAsyncClient con la configurazione necessaria:DynamoDbAsyncClient.builder().overrideConfiguration(<your configuration>).build().
cloudWatchClientConfig Configura SDK CloudWatchAsyncClient con la configurazione necessaria:CloudWatchAsyncClient.builder().overrideConfiguration(<your configuration>).build().
regionName Configura SDK con la regione preferita. È lo stesso per tutti i SDK client. Ad esempio KinesisAsyncClient.builder().region(Region.US_WEST_2).build().