Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Migre a los consumidores de la KCL versión 1.x a la KCL 2.x
En este tema se explican las diferencias entre las versiones 1.x y 2.x de la biblioteca de clientes de Kinesis (). KCL También le muestra cómo migrar a su consumidor de la versión 1.x a la versión 2.x de. KCL Después de migrar su cliente, comenzará el procesamiento de registros a partir de la última ubicación del punto de comprobación.
La versión 2.0 de KCL introduce los siguientes cambios en la interfaz:
KCLCambios en la interfaz | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
KCLInterfaz 1.x | KCLInterfaz 2.0 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor |
software.amazon.kinesis.processor.ShardRecordProcessor |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory |
software.amazon.kinesis.processor.ShardRecordProcessorFactory |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware |
Plegado en software.amazon.kinesis.processor.ShardRecordProcessor |
Temas
Migre el procesador de registros
El siguiente ejemplo muestra un procesador de registros implementado para KCL 1.x:
package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { // // Process records, and possibly checkpoint // } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
Para migrar la clase del procesador de registros
-
Cambie las interfaces de
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor
ycom.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware
asoftware.amazon.kinesis.processor.ShardRecordProcessor
, tal y como se indica a continuación:// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import software.amazon.kinesis.processor.ShardRecordProcessor; // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { public class TestRecordProcessor implements ShardRecordProcessor {
-
Actualice las instrucciones
import
para los métodosinitialize
yprocessRecords
.// import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
-
Sustituya el método
shutdown
por los métodos nuevos siguientes:leaseLost
,shardEnded
yshutdownRequested
.// @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }
A continuación, se muestra la versión actualizada de la clase del procesador de registros.
package com.amazonaws.kcl; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.ShardRecordProcessor; public class TestRecordProcessor implements ShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }
Migre la fábrica de procesadores de discos
El generador de procesadores de registros es responsable de la creación de procesadores de registros cuando se adquiere una asignación. El siguiente es un ejemplo de una fábrica KCL 1.x.
package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class TestRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new TestRecordProcessor(); } }
Para migrar el generador de procesadores de registros
-
Cambie la interfaz implementada de
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory
asoftware.amazon.kinesis.processor.ShardRecordProcessorFactory
, tal y como se indica a continuación:// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
-
Cambie la firma de retorno de
createProcessor
.// public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {
A continuación, se muestra un ejemplo de generador de procesadores de registros de la versión 2.0:
package com.amazonaws.kcl; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class TestRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new TestRecordProcessor(); } }
Migre al trabajador
En la versión 2.0 deKCL, una nueva clase, denominadaScheduler
, reemplaza a la Worker
clase. El siguiente es un ejemplo de un trabajador KCL 1.x.
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();
Para migrar el proceso de trabajo
-
Cambie la instrucción
import
para la claseWorker
por las instrucciones de importación para las clasesScheduler
yConfigsBuilder
.// import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
-
Cree un
ConfigsBuilder
y unScheduler
como se muestra en el ejemplo siguiente.Es recomendable que utilice
KinesisClientUtil
para crearKinesisAsyncClient
y para establecermaxConcurrency
enKinesisAsyncClient
.importante
El cliente de Amazon Kinesis puede experimentar un importante aumento de la latencia, a menos que configure
KinesisAsyncClient
de forma que el valor demaxConcurrency
sea lo suficientemente alto para permitir todas las asignaciones, además de los usos adicionales deKinesisAsyncClient
.import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... Region region = Region.AP_NORTHEAST_2; KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region)); DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory()); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() );
Configurar el cliente de Amazon Kinesis
Con el lanzamiento de la versión 2.0 de Kinesis Client Library, la configuración del cliente pasa de tener una única clase de configuración (KinesisClientLibConfiguration
) a tener seis clases de configuración. En la tabla siguiente, se describe la migración.
Campos de configuración y sus clases nuevas | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Campo original | Clase de configuración nueva | Descripción | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
applicationName |
ConfigsBuilder |
El nombre de la KCL aplicación. Se utiliza de forma predeterminada para tableName y consumerName . |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
tableName |
ConfigsBuilder |
Permite sustituir el nombre de la tabla que se utiliza para la tabla de asignaciones de Amazon DynamoDB. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
streamName |
ConfigsBuilder |
El nombre de la secuencia cuyos registros procesa de esta aplicación. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
kinesisEndpoint |
ConfigsBuilder |
Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
dynamoDBEndpoint |
ConfigsBuilder |
Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
initialPositionInStreamExtended |
RetrievalConfig |
La ubicación del fragmento desde la que KCL comienza a buscar registros, empezando por la ejecución inicial de la aplicación. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
kinesisCredentialsProvider |
ConfigsBuilder |
Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
dynamoDBCredentialsProvider |
ConfigsBuilder |
Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cloudWatchCredentialsProvider |
ConfigsBuilder |
Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
failoverTimeMillis |
LeaseManagementConfig | El número de milisegundos que deben transcurrir antes de que se pueda considerar que se ha producido un error en el propietario de una asignación. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
workerIdentifier |
ConfigsBuilder |
Identificador único que representa esta instancia del procesador de aplicaciones. Deben ser único. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
shardSyncIntervalMillis |
LeaseManagementConfig | El tiempo entre llamadas de sincronización del fragmento. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
maxRecords |
PollingConfig |
Permite establecer el número máximo de registros que devuelve Kinesis. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
idleTimeBetweenReadsInMillis |
CoordinatorConfig |
Esta opción se ha eliminado. Consulte Eliminación del tiempo de inactividad. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
callProcessRecordsEvenForEmptyRecordList |
ProcessorConfig |
Cuando se establece, se llama al procesador de registros incluso cuando no se proporciona ningún registro de Kinesis. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
parentShardPollIntervalMillis |
CoordinatorConfig |
Determina la frecuencia con que debería sondear un procesador de registros para ver si el fragmento principal se ha completado. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cleanupLeasesUponShardCompletion |
LeaseManagementConfig |
Cuando se establece, se eliminan las asignaciones tan pronto como se inicia el procesamiento del fragmento secundario. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ignoreUnexpectedChildShards |
LeaseManagementConfig |
Cuando se establece, los fragmentos secundarios que tienen un fragmento abierto se pasan por alto. Esto es principalmente para DynamoDB Streams. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
kinesisClientConfig |
ConfigsBuilder |
Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
dynamoDBClientConfig |
ConfigsBuilder |
Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cloudWatchClientConfig |
ConfigsBuilder |
Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
taskBackoffTimeMillis |
LifecycleConfig |
El tiempo que se debe esperar para reintentar la tareas con errores. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
metricsBufferTimeMillis |
MetricsConfig |
Controla la publicación de CloudWatch métricas. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
metricsMaxQueueSize |
MetricsConfig |
Controla la publicación de CloudWatch métricas. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
metricsLevel |
MetricsConfig |
Controla la publicación de CloudWatch métricas. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
metricsEnabledDimensions |
MetricsConfig |
Controla la publicación de CloudWatch métricas. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
validateSequenceNumberBeforeCheckpointing |
CheckpointConfig |
Esta opción se ha eliminado. Consulte Validación del número de secuencia del punto de comprobación. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
regionName |
ConfigsBuilder |
Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
maxLeasesForWorker |
LeaseManagementConfig |
El número máximo de asignaciones que debería aceptar una sola instancia de la aplicación. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
maxLeasesToStealAtOneTime |
LeaseManagementConfig |
El número máximo de asignaciones del que debería intentar apropiarse una aplicación al mismo tiempo. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
initialLeaseTableReadCapacity |
LeaseManagementConfig |
La IOPs lectura de DynamoDB que se utiliza si la biblioteca de clientes de Kinesis necesita crear una nueva tabla de concesiones de DynamoDB. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
initialLeaseTableWriteCapacity |
LeaseManagementConfig |
La IOPs lectura de DynamoDB que se utiliza si la biblioteca de clientes de Kinesis necesita crear una nueva tabla de concesiones de DynamoDB. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
initialPositionInStreamExtended |
LeaseManagementConfig | La posición inicial de la secuencia en la que debería comenzar la aplicación. Esto solo se utiliza durante la creación inicial de la asignación. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
skipShardSyncAtWorkerInitializationIfLeasesExist |
CoordinatorConfig |
Deshabilita la sincronización de los datos de los fragmentos si la tabla de asignaciones todavía contiene entradas. TODO: -438 KinesisEco | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
shardPrioritization |
CoordinatorConfig |
La priorización de fragmentos que se va a utilizar. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
shutdownGraceMillis |
N/A | Esta opción se ha eliminado. Consulte MultiLang Mudanzas. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
timeoutInSeconds |
N/A | Esta opción se ha eliminado. Consulte MultiLang Mudanzas. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
retryGetRecordsInSeconds |
PollingConfig |
Configura el retraso entre GetRecords intentos en caso de error. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
maxGetRecordsThreadPool |
PollingConfig |
El tamaño del grupo de subprocesos utilizado para GetRecords. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
maxLeaseRenewalThreads |
LeaseManagementConfig |
Controla el tamaño del grupo de subprocesos del renovador de asignaciones. Cuanto más grande sea el número de asignaciones que puede tomar la aplicación, más grande debe ser este grupo. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
recordsFetcherFactory |
PollingConfig |
Permite sustituir el generador que se utiliza para crear capturadores que recuperan datos de las secuencias. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
logWarningForTaskAfterMillis |
LifecycleConfig |
Tiempo que se debe esperar antes de registrar una advertencia si una tarea no ha finalizado. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
listShardsBackoffTimeInMillis |
RetrievalConfig |
El número de milisegundos que se debe esperar entre llamadas a ListShards cuando se producen errores. |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
maxListShardsRetryAttempts |
RetrievalConfig |
El número máximo de veces que se reintenta ListShards antes de desistir. |
Eliminación del tiempo de inactividad
En la versión 1.x delKCL, idleTimeBetweenReadsInMillis
correspondían a dos cantidades:
-
La cantidad de tiempo entre envíos de tareas. Ahora puede configurar este tiempo entre tareas estableciendo
CoordinatorConfig#shardConsumerDispatchPollIntervalMillis
. -
La cantidad de tiempo en reposo cuando no se devuelven registros desde Kinesis Data Streams. En la versión 2.0, con la distribución ramificada mejorada, los registros se envían desde sus respectivos recuperadores. Solo se produce actividad en el consumidor del fragmento cuando se recibe una solicitud enviada.
Eliminaciones de la configuración del cliente
En la versión 2.0, ya KCL no crea clientes. El usuario es el responsable de suministrar un cliente válido. Con este cambio, se han eliminado todos los parámetros de configuración que controlaban la creación de clientes. Si necesita estos parámetros, puede establecerlos en los clientes antes de proporcionar clientes a ConfigsBuilder
.
Campo eliminado | Configuración equivalente |
---|---|
kinesisEndpoint |
Configure el SDK KinesisAsyncClient con el punto final preferido:KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis
endpoint>")).build() . |
dynamoDBEndpoint |
Configure el SDK DynamoDbAsyncClient con el punto final preferido:DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb
endpoint>")).build() . |
kinesisClientConfig |
Configure el SDK KinesisAsyncClient con la configuración necesaria:KinesisAsyncClient.builder().overrideConfiguration(<your
configuration>).build() . |
dynamoDBClientConfig |
Configure el SDK DynamoDbAsyncClient con la configuración necesaria:DynamoDbAsyncClient.builder().overrideConfiguration(<your
configuration>).build() . |
cloudWatchClientConfig |
Configure el SDK CloudWatchAsyncClient con la configuración necesaria:CloudWatchAsyncClient.builder().overrideConfiguration(<your
configuration>).build() . |
regionName |
Configure el SDK con la región preferida. Esto es igual para todos los SDK clientes. Por ejemplo, KinesisAsyncClient.builder().region(Region.US_WEST_2).build() . |