Migre a los consumidores de la KCL versión 1.x a la KCL 2.x - Amazon Kinesis Data Streams

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Migre a los consumidores de la KCL versión 1.x a la KCL 2.x

En este tema se explican las diferencias entre las versiones 1.x y 2.x de la biblioteca de clientes de Kinesis (). KCL También le muestra cómo migrar a su consumidor de la versión 1.x a la versión 2.x de. KCL Después de migrar su cliente, comenzará el procesamiento de registros a partir de la última ubicación del punto de comprobación.

La versión 2.0 de KCL introduce los siguientes cambios en la interfaz:

KCLCambios en la interfaz
KCLInterfaz 1.x KCLInterfaz 2.0
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor software.amazon.kinesis.processor.ShardRecordProcessor
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory software.amazon.kinesis.processor.ShardRecordProcessorFactory
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware Plegado en software.amazon.kinesis.processor.ShardRecordProcessor

Migre el procesador de registros

El siguiente ejemplo muestra un procesador de registros implementado para KCL 1.x:

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { // // Process records, and possibly checkpoint // } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
Para migrar la clase del procesador de registros
  1. Cambie las interfaces de com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor y com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware a software.amazon.kinesis.processor.ShardRecordProcessor, tal y como se indica a continuación:

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import software.amazon.kinesis.processor.ShardRecordProcessor; // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { public class TestRecordProcessor implements ShardRecordProcessor {
  2. Actualice las instrucciones import para los métodos initialize y processRecords.

    // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
  3. Sustituya el método shutdown por los métodos nuevos siguientes: leaseLost, shardEnded y shutdownRequested.

    // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }

A continuación, se muestra la versión actualizada de la clase del procesador de registros.

package com.amazonaws.kcl; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.ShardRecordProcessor; public class TestRecordProcessor implements ShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }

Migre la fábrica de procesadores de discos

El generador de procesadores de registros es responsable de la creación de procesadores de registros cuando se adquiere una asignación. El siguiente es un ejemplo de una fábrica KCL 1.x.

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class TestRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new TestRecordProcessor(); } }
Para migrar el generador de procesadores de registros
  1. Cambie la interfaz implementada de com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory a software.amazon.kinesis.processor.ShardRecordProcessorFactory, tal y como se indica a continuación:

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
  2. Cambie la firma de retorno de createProcessor.

    // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {

A continuación, se muestra un ejemplo de generador de procesadores de registros de la versión 2.0:

package com.amazonaws.kcl; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class TestRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new TestRecordProcessor(); } }

Migre al trabajador

En la versión 2.0 deKCL, una nueva clase, denominadaScheduler, reemplaza a la Worker clase. El siguiente es un ejemplo de un trabajador KCL 1.x.

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();
Para migrar el proceso de trabajo
  1. Cambie la instrucción import para la clase Worker por las instrucciones de importación para las clases Scheduler y ConfigsBuilder.

    // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
  2. Cree un ConfigsBuilder y un Scheduler como se muestra en el ejemplo siguiente.

    Es recomendable que utilice KinesisClientUtil para crear KinesisAsyncClient y para establecer maxConcurrency en KinesisAsyncClient.

    importante

    El cliente de Amazon Kinesis puede experimentar un importante aumento de la latencia, a menos que configure KinesisAsyncClient de forma que el valor de maxConcurrency sea lo suficientemente alto para permitir todas las asignaciones, además de los usos adicionales de KinesisAsyncClient.

    import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... Region region = Region.AP_NORTHEAST_2; KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region)); DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory()); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() );

Configurar el cliente de Amazon Kinesis

Con el lanzamiento de la versión 2.0 de Kinesis Client Library, la configuración del cliente pasa de tener una única clase de configuración (KinesisClientLibConfiguration) a tener seis clases de configuración. En la tabla siguiente, se describe la migración.

Campos de configuración y sus clases nuevas
Campo original Clase de configuración nueva Descripción
applicationName ConfigsBuilder El nombre de la KCL aplicación. Se utiliza de forma predeterminada para tableName y consumerName.
tableName ConfigsBuilder Permite sustituir el nombre de la tabla que se utiliza para la tabla de asignaciones de Amazon DynamoDB.
streamName ConfigsBuilder El nombre de la secuencia cuyos registros procesa de esta aplicación.
kinesisEndpoint ConfigsBuilder Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes.
dynamoDBEndpoint ConfigsBuilder Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes.
initialPositionInStreamExtended RetrievalConfig La ubicación del fragmento desde la que KCL comienza a buscar registros, empezando por la ejecución inicial de la aplicación.
kinesisCredentialsProvider ConfigsBuilder Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes.
dynamoDBCredentialsProvider ConfigsBuilder Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes.
cloudWatchCredentialsProvider ConfigsBuilder Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes.
failoverTimeMillis LeaseManagementConfig El número de milisegundos que deben transcurrir antes de que se pueda considerar que se ha producido un error en el propietario de una asignación.
workerIdentifier ConfigsBuilder Identificador único que representa esta instancia del procesador de aplicaciones. Deben ser único.
shardSyncIntervalMillis LeaseManagementConfig El tiempo entre llamadas de sincronización del fragmento.
maxRecords PollingConfig Permite establecer el número máximo de registros que devuelve Kinesis.
idleTimeBetweenReadsInMillis CoordinatorConfig Esta opción se ha eliminado. Consulte Eliminación del tiempo de inactividad.
callProcessRecordsEvenForEmptyRecordList ProcessorConfig Cuando se establece, se llama al procesador de registros incluso cuando no se proporciona ningún registro de Kinesis.
parentShardPollIntervalMillis CoordinatorConfig Determina la frecuencia con que debería sondear un procesador de registros para ver si el fragmento principal se ha completado.
cleanupLeasesUponShardCompletion LeaseManagementConfig Cuando se establece, se eliminan las asignaciones tan pronto como se inicia el procesamiento del fragmento secundario.
ignoreUnexpectedChildShards LeaseManagementConfig Cuando se establece, los fragmentos secundarios que tienen un fragmento abierto se pasan por alto. Esto es principalmente para DynamoDB Streams.
kinesisClientConfig ConfigsBuilder Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes.
dynamoDBClientConfig ConfigsBuilder Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes.
cloudWatchClientConfig ConfigsBuilder Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes.
taskBackoffTimeMillis LifecycleConfig El tiempo que se debe esperar para reintentar la tareas con errores.
metricsBufferTimeMillis MetricsConfig Controla la publicación de CloudWatch métricas.
metricsMaxQueueSize MetricsConfig Controla la publicación de CloudWatch métricas.
metricsLevel MetricsConfig Controla la publicación de CloudWatch métricas.
metricsEnabledDimensions MetricsConfig Controla la publicación de CloudWatch métricas.
validateSequenceNumberBeforeCheckpointing CheckpointConfig Esta opción se ha eliminado. Consulte Validación del número de secuencia del punto de comprobación.
regionName ConfigsBuilder Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes.
maxLeasesForWorker LeaseManagementConfig El número máximo de asignaciones que debería aceptar una sola instancia de la aplicación.
maxLeasesToStealAtOneTime LeaseManagementConfig El número máximo de asignaciones del que debería intentar apropiarse una aplicación al mismo tiempo.
initialLeaseTableReadCapacity LeaseManagementConfig La IOPs lectura de DynamoDB que se utiliza si la biblioteca de clientes de Kinesis necesita crear una nueva tabla de concesiones de DynamoDB.
initialLeaseTableWriteCapacity LeaseManagementConfig La IOPs lectura de DynamoDB que se utiliza si la biblioteca de clientes de Kinesis necesita crear una nueva tabla de concesiones de DynamoDB.
initialPositionInStreamExtended LeaseManagementConfig La posición inicial de la secuencia en la que debería comenzar la aplicación. Esto solo se utiliza durante la creación inicial de la asignación.
skipShardSyncAtWorkerInitializationIfLeasesExist CoordinatorConfig Deshabilita la sincronización de los datos de los fragmentos si la tabla de asignaciones todavía contiene entradas. TODO: -438 KinesisEco
shardPrioritization CoordinatorConfig La priorización de fragmentos que se va a utilizar.
shutdownGraceMillis N/A Esta opción se ha eliminado. Consulte MultiLang Mudanzas.
timeoutInSeconds N/A Esta opción se ha eliminado. Consulte MultiLang Mudanzas.
retryGetRecordsInSeconds PollingConfig Configura el retraso entre GetRecords intentos en caso de error.
maxGetRecordsThreadPool PollingConfig El tamaño del grupo de subprocesos utilizado para GetRecords.
maxLeaseRenewalThreads LeaseManagementConfig Controla el tamaño del grupo de subprocesos del renovador de asignaciones. Cuanto más grande sea el número de asignaciones que puede tomar la aplicación, más grande debe ser este grupo.
recordsFetcherFactory PollingConfig Permite sustituir el generador que se utiliza para crear capturadores que recuperan datos de las secuencias.
logWarningForTaskAfterMillis LifecycleConfig Tiempo que se debe esperar antes de registrar una advertencia si una tarea no ha finalizado.
listShardsBackoffTimeInMillis RetrievalConfig El número de milisegundos que se debe esperar entre llamadas a ListShards cuando se producen errores.
maxListShardsRetryAttempts RetrievalConfig El número máximo de veces que se reintenta ListShards antes de desistir.

Eliminación del tiempo de inactividad

En la versión 1.x delKCL, idleTimeBetweenReadsInMillis correspondían a dos cantidades:

  • La cantidad de tiempo entre envíos de tareas. Ahora puede configurar este tiempo entre tareas estableciendo CoordinatorConfig#shardConsumerDispatchPollIntervalMillis.

  • La cantidad de tiempo en reposo cuando no se devuelven registros desde Kinesis Data Streams. En la versión 2.0, con la distribución ramificada mejorada, los registros se envían desde sus respectivos recuperadores. Solo se produce actividad en el consumidor del fragmento cuando se recibe una solicitud enviada.

Eliminaciones de la configuración del cliente

En la versión 2.0, ya KCL no crea clientes. El usuario es el responsable de suministrar un cliente válido. Con este cambio, se han eliminado todos los parámetros de configuración que controlaban la creación de clientes. Si necesita estos parámetros, puede establecerlos en los clientes antes de proporcionar clientes a ConfigsBuilder.

Campo eliminado Configuración equivalente
kinesisEndpoint Configure el SDK KinesisAsyncClient con el punto final preferido:KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis endpoint>")).build().
dynamoDBEndpoint Configure el SDK DynamoDbAsyncClient con el punto final preferido:DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb endpoint>")).build().
kinesisClientConfig Configure el SDK KinesisAsyncClient con la configuración necesaria:KinesisAsyncClient.builder().overrideConfiguration(<your configuration>).build().
dynamoDBClientConfig Configure el SDK DynamoDbAsyncClient con la configuración necesaria:DynamoDbAsyncClient.builder().overrideConfiguration(<your configuration>).build().
cloudWatchClientConfig Configure el SDK CloudWatchAsyncClient con la configuración necesaria:CloudWatchAsyncClient.builder().overrideConfiguration(<your configuration>).build().
regionName Configure el SDK con la región preferida. Esto es igual para todos los SDK clientes. Por ejemplo, KinesisAsyncClient.builder().region(Region.US_WEST_2).build().