

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Migrar consumidores de KCL 1.x a KCL 2.x
<a name="kcl-migration"></a>

**importante**  
Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. La versión 1.x de KCL estará disponible el 30 de end-of-support enero de 2026. **Recomendamos** que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de la [biblioteca de clientes de Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) en. GitHub Para obtener información sobre las versiones más recientes de KCL, consulte [Uso de Kinesis Client Library](kcl.md). Para obtener información sobre cómo migrar de KCL 1.x a KCL 3.x, consulte [Migración de KCL 1.x a KCL 3.x](kcl-migration-1-3.md).

En este tema se explican las diferencias entre las versiones 1.x y 2.x de Kinesis Client Library (KCL). También le muestra cómo migrar el consumidor de la versión 1.x a la versión 2.x de KCL. Después de migrar su cliente, comenzará el procesamiento de registros a partir de la última ubicación del punto de comprobación.

La versión 2.0 de KCL presenta los siguientes cambios en la interfaz:


**Cambios en la interfaz de KCL**  

| Interfaz de KCL 1.x | Interfaz de KCL 2.0 | 
| --- | --- | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor | software.amazon.kinesis.processor.ShardRecordProcessor | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory | software.amazon.kinesis.processor.ShardRecordProcessorFactory | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware | Plegado en software.amazon.kinesis.processor.ShardRecordProcessor | 

**Topics**
+ [Migrar el procesador de registros](#recrod-processor-migration)
+ [Migrar el generador de procesadores de registros](#recrod-processor-factory-migration)
+ [Migrar el proceso de trabajo](#worker-migration)
+ [Configurar el cliente de Amazon Kinesis](#client-configuration)
+ [Eliminar el tiempo de inactividad](#idle-time-removal)
+ [Eliminar la configuración de clientes](#client-configuration-removals)

## Migrar el procesador de registros
<a name="recrod-processor-migration"></a>

En el siguiente ejemplo, se muestra un procesador de registros implementado para la versión 1.x de KCL:

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        //
        // Process records, and possibly checkpoint
        //
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**Para migrar la clase del procesador de registros**

1. Cambie las interfaces de `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` y `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` a `software.amazon.kinesis.processor.ShardRecordProcessor`, tal y como se indica a continuación:

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   
   // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
   public class TestRecordProcessor implements ShardRecordProcessor {
   ```

1. Actualice las instrucciones `import` para los métodos `initialize` y `processRecords`.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
   ```

1. Sustituya el método `shutdown` por los métodos nuevos siguientes: `leaseLost`, `shardEnded` y `shutdownRequested`.

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

A continuación, se muestra la versión actualizada de la clase del procesador de registros.

```
package com.amazonaws.kcl;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;

public class TestRecordProcessor implements ShardRecordProcessor {
    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

## Migrar el generador de procesadores de registros
<a name="recrod-processor-factory-migration"></a>

El generador de procesadores de registros es responsable de la creación de procesadores de registros cuando se adquiere una asignación. A continuación, se muestra un ejemplo de un generador de la versión 1.x de KCL.

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class TestRecordProcessorFactory implements IRecordProcessorFactory {
    @Override
    public IRecordProcessor createProcessor() {
        return new TestRecordProcessor();
    }
}
```

**Para migrar el generador de procesadores de registros**

1. Cambie la interfaz implementada de `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` a `software.amazon.kinesis.processor.ShardRecordProcessorFactory`, tal y como se indica a continuación:

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
   
   // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
   public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
   ```

1. Cambie la firma de retorno de `createProcessor`.

   ```
   // public IRecordProcessor createProcessor() {
   public ShardRecordProcessor shardRecordProcessor() {
   ```

A continuación, se muestra un ejemplo de generador de procesadores de registros de la versión 2.0:

```
package com.amazonaws.kcl;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new TestRecordProcessor();
    }
}
```

## Migrar el proceso de trabajo
<a name="worker-migration"></a>

En la versión 2.0 de KCL, una nueva clase, llamada `Scheduler`, sustituye a la clase `Worker`. A continuación, se muestra un ejemplo de proceso de trabajo de la versión 1.x de KCL.

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

**Para migrar el proceso de trabajo**

1. Cambie la instrucción `import` para la clase `Worker` por las instrucciones de importación para las clases `Scheduler` y `ConfigsBuilder`.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. Cree un `ConfigsBuilder` y un `Scheduler` como se muestra en el ejemplo siguiente.

   Es recomendable que utilice `KinesisClientUtil` para crear `KinesisAsyncClient` y para establecer `maxConcurrency` en `KinesisAsyncClient`.
**importante**  
El cliente de Amazon Kinesis puede experimentar un importante aumento de la latencia, a menos que configure `KinesisAsyncClient` de forma que el valor de `maxConcurrency` sea lo suficientemente alto para permitir todas las asignaciones, además de los usos adicionales de `KinesisAsyncClient`.

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   import software.amazon.kinesis.common.ConfigsBuilder;
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
   Region region = Region.AP_NORTHEAST_2;
   KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region));
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
   ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());
   
   Scheduler scheduler = new Scheduler(
       configsBuilder.checkpointConfig(),
       configsBuilder.coordinatorConfig(),
       configsBuilder.leaseManagementConfig(),
       configsBuilder.lifecycleConfig(),
       configsBuilder.metricsConfig(),
       configsBuilder.processorConfig(),
       configsBuilder.retrievalConfig()
       );
   ```

## Configurar el cliente de Amazon Kinesis
<a name="client-configuration"></a>

Con el lanzamiento de la versión 2.0 de Kinesis Client Library, la configuración del cliente pasa de tener una única clase de configuración (`KinesisClientLibConfiguration`) a tener seis clases de configuración. En la tabla siguiente, se describe la migración.


**Campos de configuración y sus clases nuevas**  

| Campo original | Clase de configuración nueva | Description (Descripción) | 
| --- | --- | --- | 
| applicationName | ConfigsBuilder | El nombre de la aplicación de KCL. Se utiliza de forma predeterminada para tableName y consumerName. | 
| tableName | ConfigsBuilder | Permite sustituir el nombre de la tabla que se utiliza para la tabla de asignaciones de Amazon DynamoDB. | 
| streamName | ConfigsBuilder | El nombre de la secuencia cuyos registros procesa de esta aplicación. | 
| kinesisEndpoint | ConfigsBuilder | Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes. | 
| dynamoDBEndpoint | ConfigsBuilder | Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes. | 
| initialPositionInStreamExtended | RetrievalConfig | La ubicación de la partición desde la que KCL comienza a recuperar registros, empezando por la ejecución inicial de la aplicación. | 
| kinesisCredentialsProvider | ConfigsBuilder | Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes. | 
| dynamoDBCredentialsProvider | ConfigsBuilder | Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes. | 
| cloudWatchCredentialsProvider | ConfigsBuilder | Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes. | 
| failoverTimeMillis | LeaseManagementConfig | El número de milisegundos que deben transcurrir antes de que se pueda considerar que se ha producido un error en el propietario de una asignación. | 
| workerIdentifier | ConfigsBuilder | Identificador único que representa esta instancia del procesador de aplicaciones. Deben ser único. | 
| shardSyncIntervalMillis | LeaseManagementConfig | El tiempo entre llamadas de sincronización del fragmento. | 
| maxRecords | PollingConfig | Permite establecer el número máximo de registros que devuelve Kinesis. | 
| idleTimeBetweenReadsInMillis | CoordinatorConfig | Esta opción se ha eliminado. Consulte Eliminación del tiempo de inactividad. | 
| callProcessRecordsEvenForEmptyRecordList | ProcessorConfig | Cuando se establece, se llama al procesador de registros incluso cuando no se proporciona ningún registro de Kinesis. | 
| parentShardPollIntervalMillis | CoordinatorConfig | Determina la frecuencia con que debería sondear un procesador de registros para ver si el fragmento principal se ha completado. | 
| cleanupLeasesUponShardCompletion | LeaseManagementConfig | Cuando se establece, se eliminan las asignaciones tan pronto como se inicia el procesamiento del fragmento secundario. | 
| ignoreUnexpectedChildShards | LeaseManagementConfig | Cuando se establece, los fragmentos secundarios que tienen un fragmento abierto se pasan por alto. Esto es principalmente para DynamoDB Streams. | 
| kinesisClientConfig | ConfigsBuilder | Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes. | 
| dynamoDBClientConfig | ConfigsBuilder | Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes. | 
| cloudWatchClientConfig | ConfigsBuilder | Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes. | 
| taskBackoffTimeMillis | LifecycleConfig | El tiempo que se debe esperar para reintentar la tareas con errores. | 
| metricsBufferTimeMillis | MetricsConfig | Controla la publicación de CloudWatch métricas. | 
| metricsMaxQueueSize | MetricsConfig | Controla la publicación de CloudWatch métricas. | 
| metricsLevel | MetricsConfig | Controla la publicación de CloudWatch métricas. | 
| metricsEnabledDimensions | MetricsConfig | Controla la publicación de CloudWatch métricas. | 
| validateSequenceNumberBeforeCheckpointing | CheckpointConfig | Esta opción se ha eliminado. Consulte Validación del número de secuencia del punto de comprobación. | 
| regionName | ConfigsBuilder | Esta opción se ha eliminado. Consulte Eliminación de los parámetros de configuración de clientes. | 
| maxLeasesForWorker | LeaseManagementConfig | El número máximo de asignaciones que debería aceptar una sola instancia de la aplicación. | 
| maxLeasesToStealAtOneTime | LeaseManagementConfig | El número máximo de asignaciones del que debería intentar apropiarse una aplicación al mismo tiempo. | 
| initialLeaseTableReadCapacity | LeaseManagementConfig | La IOPs lectura de DynamoDB que se utiliza si la biblioteca de clientes de Kinesis necesita crear una nueva tabla de concesiones de DynamoDB. | 
| initialLeaseTableWriteCapacity | LeaseManagementConfig | La IOPs lectura de DynamoDB que se utiliza si la biblioteca de clientes de Kinesis necesita crear una nueva tabla de concesiones de DynamoDB. | 
| initialPositionInStreamExtended | LeaseManagementConfig | La posición inicial de la secuencia en la que debería comenzar la aplicación. Esto solo se utiliza durante la creación inicial de la asignación. | 
| skipShardSyncAtWorkerInitializationIfLeasesExist | CoordinatorConfig | Deshabilita la sincronización de los datos de los fragmentos si la tabla de asignaciones todavía contiene entradas. KinesisEcoTODO: -438 | 
| shardPrioritization | CoordinatorConfig | La priorización de fragmentos que se va a utilizar. | 
| shutdownGraceMillis | N/A | Esta opción se ha eliminado. Consulte MultiLang Mudanzas. | 
| timeoutInSeconds | N/A | Esta opción se ha eliminado. Consulte MultiLang Mudanzas. | 
| retryGetRecordsInSeconds | PollingConfig | Configura el retraso entre GetRecords intentos en caso de error. | 
| maxGetRecordsThreadPool | PollingConfig | El tamaño del grupo de subprocesos utilizado para GetRecords. | 
| maxLeaseRenewalThreads | LeaseManagementConfig | Controla el tamaño del grupo de subprocesos del renovador de asignaciones. Cuanto más grande sea el número de asignaciones que puede tomar la aplicación, más grande debe ser este grupo. | 
| recordsFetcherFactory | PollingConfig | Permite sustituir el generador que se utiliza para crear capturadores que recuperan datos de las secuencias. | 
| logWarningForTaskAfterMillis | LifecycleConfig | Tiempo que se debe esperar antes de registrar una advertencia si una tarea no ha finalizado. | 
| listShardsBackoffTimeInMillis | RetrievalConfig | El número de milisegundos que se debe esperar entre llamadas a ListShards cuando se producen errores. | 
| maxListShardsRetryAttempts | RetrievalConfig | El número máximo de veces que se reintenta ListShards antes de desistir. | 

## Eliminar el tiempo de inactividad
<a name="idle-time-removal"></a>

En la versión 1.x de KCL, `idleTimeBetweenReadsInMillis` correspondía a dos cantidades: 
+ La cantidad de tiempo entre envíos de tareas. Ahora puede configurar este tiempo entre tareas estableciendo `CoordinatorConfig#shardConsumerDispatchPollIntervalMillis`.
+ La cantidad de tiempo en reposo cuando no se devuelven registros desde Kinesis Data Streams. En la versión 2.0, con la distribución ramificada mejorada, los registros se envían desde sus respectivos recuperadores. Solo se produce actividad en el consumidor del fragmento cuando se recibe una solicitud enviada. 

## Eliminar la configuración de clientes
<a name="client-configuration-removals"></a>

En la versión 2.0, KCL ya no crea los clientes. El usuario es el responsable de suministrar un cliente válido. Con este cambio, se han eliminado todos los parámetros de configuración que controlaban la creación de clientes. Si necesita estos parámetros, puede establecerlos en los clientes antes de proporcionar clientes a `ConfigsBuilder`.


****  

| Campo eliminado | Configuración equivalente | 
| --- | --- | 
| kinesisEndpoint | Configure el SDK KinesisAsyncClient con el punto de enlace preferido: KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis endpoint>")).build(). | 
| dynamoDBEndpoint | Configure el SDK DynamoDbAsyncClient con el punto de enlace preferido: DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb endpoint>")).build(). | 
| kinesisClientConfig | Configure el SDK KinesisAsyncClient con la configuración necesaria: KinesisAsyncClient.builder().overrideConfiguration(<your configuration>).build(). | 
| dynamoDBClientConfig | Configure el SDK DynamoDbAsyncClient con la configuración necesaria: DynamoDbAsyncClient.builder().overrideConfiguration(<your configuration>).build(). | 
| cloudWatchClientConfig | Configure el SDK CloudWatchAsyncClient con la configuración necesaria: CloudWatchAsyncClient.builder().overrideConfiguration(<your configuration>).build(). | 
| regionName | Configure el SDK con la región preferida. Es la misma para todos los clientes del SDK. Por ejemplo, KinesisAsyncClient.builder().region(Region.US\$1WEST\$12).build(). | 