

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Migra i consumatori da KCL 1.x a KCL 2.x
<a name="kcl-migration"></a>

**Importante**  
Le versioni 1.x e 2.x di Amazon KCL sono obsolete. KCL 1.x arriverà il 30 gennaio 2026. end-of-support **Ti consigliamo vivamente di** migrare le tue applicazioni KCL utilizzando la versione 1.x all'ultima versione di KCL prima del 30 gennaio 2026. Per trovare la versione più recente di KCL, consulta la pagina [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) su. GitHub Per informazioni sulle ultime versioni di KCL, consulta. [Usa la libreria client Kinesis](kcl.md) Per informazioni sulla migrazione da KCL 1.x a KCL 3.x, consulta. [Migrazione di KLC da 1.x a 3.x](kcl-migration-1-3.md)

Questo argomento descrive le differenze tra le versioni 1.x e 2.x della Kinesis Client Library (KCL). Viene inoltre illustrato come migrare l'applicazione consumer dalla versione 1.x alla versione 2.x della KCL. Dopo la migrazione, il client avvierà l'elaborazione dei record dall'ultimo punto di controllo verificato.

La versione 2.0 della KCL introduce le seguenti modifiche di interfaccia:


**Modifiche di interfaccia KCL**  

| Interfaccia KCL 1.x | Interfaccia KCL 2.0 | 
| --- | --- | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor | software.amazon.kinesis.processor.ShardRecordProcessor | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory | software.amazon.kinesis.processor.ShardRecordProcessorFactory | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware | Piegata in software.amazon.kinesis.processor.ShardRecordProcessor | 

**Topics**
+ [Migrare il processore di registrazione](#recrod-processor-migration)
+ [Eseguire la migrazione della fabbrica del processore di registrazione](#recrod-processor-factory-migration)
+ [Migrare il lavoratore](#worker-migration)
+ [Configurazione del client Amazon Kinesis](#client-configuration)
+ [Rimozione dei tempi di inattività](#idle-time-removal)
+ [Rimozioni della configurazione del client](#client-configuration-removals)

## Migrare il processore di registrazione
<a name="recrod-processor-migration"></a>

L'esempio seguente mostra un elaboratore di record implementato per &KCL; 1.x:

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        //
        // Process records, and possibly checkpoint
        //
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**Per migrare la classe dell'elaboratore di record**

1. Modifica le interfacce da `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` e `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` verso `software.amazon.kinesis.processor.ShardRecordProcessor`, come segue:

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   
   // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
   public class TestRecordProcessor implements ShardRecordProcessor {
   ```

1. Aggiorna le istruzioni `import` per i metodi `initialize` e `processRecords`.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
   ```

1. Sostituisci il metodo `shutdown` con i seguenti nuovi metodi: `leaseLost`, `shardEnded` e `shutdownRequested`.

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

Segue la versione aggiornata della classe dell'elaboratore di record.

```
package com.amazonaws.kcl;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;

public class TestRecordProcessor implements ShardRecordProcessor {
    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

## Eseguire la migrazione della fabbrica del processore di registrazione
<a name="recrod-processor-factory-migration"></a>

La fabbrica dell'elaboratore di record è responsabile per la creazione di elaboratori di record quando un lease è acquisito. Di seguito è illustrato un esempio di una fabbrica &KCL; 1.x.

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class TestRecordProcessorFactory implements IRecordProcessorFactory {
    @Override
    public IRecordProcessor createProcessor() {
        return new TestRecordProcessor();
    }
}
```

**Per migrare la fabbrica dell'elaboratore di record**

1. Modifica l'interfaccia implementata da `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` a `software.amazon.kinesis.processor.ShardRecordProcessorFactory`, come segue.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
   
   // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
   public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
   ```

1. Modifica la firma di ritorno per `createProcessor`.

   ```
   // public IRecordProcessor createProcessor() {
   public ShardRecordProcessor shardRecordProcessor() {
   ```

Di seguito è riportato un esempio di fabbrica di elaboratore di record in 2.0:

```
package com.amazonaws.kcl;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new TestRecordProcessor();
    }
}
```

## Migrare il lavoratore
<a name="worker-migration"></a>

Nella versione 2.0 della KCL, una nuova classe, denominata `Scheduler`, sostituisce la classe `Worker`. Di seguito è illustrato un esempio di un worker di KCL 1.x.

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

**Per migrare il lavoratore**

1. Modifica la dichiarazione `import` per la classe `Worker` nelle dichiarazioni di importazione delle classi `Scheduler` e `ConfigsBuilder`.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. Crea `ConfigsBuilder` e `Scheduler` come mostrato nell'esempio seguente.

   Si consiglia di utilizzare `KinesisClientUtil` per creare `KinesisAsyncClient` e configurare `maxConcurrency` in `KinesisAsyncClient`.
**Importante**  
Il client Amazon Kinesis potrebbe presentare un aumento significativo della latenza, a meno che non venga configurato `KinesisAsyncClient` per avere una `maxConcurrency` sufficientemente alta per consentire tutti i canoni più ulteriori utilizzi di `KinesisAsyncClient`.

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   import software.amazon.kinesis.common.ConfigsBuilder;
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
   Region region = Region.AP_NORTHEAST_2;
   KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region));
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
   ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());
   
   Scheduler scheduler = new Scheduler(
       configsBuilder.checkpointConfig(),
       configsBuilder.coordinatorConfig(),
       configsBuilder.leaseManagementConfig(),
       configsBuilder.lifecycleConfig(),
       configsBuilder.metricsConfig(),
       configsBuilder.processorConfig(),
       configsBuilder.retrievalConfig()
       );
   ```

## Configurazione del client Amazon Kinesis
<a name="client-configuration"></a>

Con il rilascio 2.0 della Kinesis Client Library, la configurazione del client si è spostata da una singola classe di configurazione (`KinesisClientLibConfiguration`) a sei classi di configurazione. La tabella seguente descrive la migrazione.


**Campi di configurazione e relative nuove classi**  

| Campo originale | Nuova classe di configurazione | Description | 
| --- | --- | --- | 
| applicationName | ConfigsBuilder | Il nome per l'applicazione della KCL. Utilizzato come predefinito per tableName e consumerName. | 
| tableName | ConfigsBuilder | Consente di ignorare il nome della tabella utilizzato per la tabella di lease di Amazon DynamoDB. | 
| streamName | ConfigsBuilder | Il nome del flusso dal quale l'applicazione elabora i record. | 
| kinesisEndpoint | ConfigsBuilder | Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni. | 
| dynamoDBEndpoint | ConfigsBuilder | Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni. | 
| initialPositionInStreamExtended | RetrievalConfig | La posizione nello shard da cui il KCL inizia a recuperare i record, a partire dall'esecuzione iniziare all'applicazione. | 
| kinesisCredentialsProvider | ConfigsBuilder | Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni. | 
| dynamoDBCredentialsProvider | ConfigsBuilder | Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni. | 
| cloudWatchCredentialsProvider | ConfigsBuilder | Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni. | 
| failoverTimeMillis | LeaseManagementConfig | Il numero di millisecondi che devono passare prima di poter considerare un proprietario di lease come fallito. | 
| workerIdentifier | ConfigsBuilder | Un identificatore univoco che rappresenta la creazione dell'elaboratore di applicazione. Deve essere univoco. | 
| shardSyncIntervalMillis | LeaseManagementConfig | Il periodo di tempo tra le chiamate di sincronizzazione dello shard. | 
| maxRecords | PollingConfig | Consente di impostare il numero massimo di record restituiti da Kinesis. | 
| idleTimeBetweenReadsInMillis | CoordinatorConfig | Questa opzione è stata eliminata. Vedi rimozione tempo di inattività. | 
| callProcessRecordsEvenForEmptyRecordList | ProcessorConfig | Quando impostato, l'elaboratore di record viene chiamato anche quando nessun record è stato fornito da . | 
| parentShardPollIntervalMillis | CoordinatorConfig | Con quale frequenza un elaboratore di record deve eseguire il polling per vedere se il shard padre è stata completato. | 
| cleanupLeasesUponShardCompletion | LeaseManagementConfig | Quando impostati, i lease vengono rimossi non appena i lease figlio hanno iniziato l'elaborazione. | 
| ignoreUnexpectedChildShards | LeaseManagementConfig | Quando impostato, i shard figlio che hanno un shard aperto vengono ignorati. Questo è principalmente per DynamoDB Streams. | 
| kinesisClientConfig | ConfigsBuilder | Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni. | 
| dynamoDBClientConfig | ConfigsBuilder | Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni. | 
| cloudWatchClientConfig | ConfigsBuilder | Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni. | 
| taskBackoffTimeMillis | LifecycleConfig | Il tempo di attesa per riprovare operazioni non riuscite. | 
| metricsBufferTimeMillis | MetricsConfig | Controlla la pubblicazione CloudWatch delle metriche. | 
| metricsMaxQueueSize | MetricsConfig | Controlla la pubblicazione CloudWatch delle metriche. | 
| metricsLevel | MetricsConfig | Controlla la pubblicazione CloudWatch delle metriche. | 
| metricsEnabledDimensions | MetricsConfig | Controlla la pubblicazione CloudWatch delle metriche. | 
| validateSequenceNumberBeforeCheckpointing | CheckpointConfig | Questa opzione è stata eliminata. Vedi la convalida del numero di sequenza del checkpoint. | 
| regionName | ConfigsBuilder | Questa opzione è stata eliminata. Vedi la rimozione della configurazione client. | 
| maxLeasesForWorker | LeaseManagementConfig | Il numero massimo di lease che una singola istanza dell'applicazione deve accettare. | 
| maxLeasesToStealAtOneTime | LeaseManagementConfig | Il numero massimo di lease che un'applicazione deve tentare di intercettare simultaneamente. | 
| initialLeaseTableReadCapacity | LeaseManagementConfig | La IOPs lettura DynamoDB utilizzata se la Kinesis Client Library deve creare una nuova tabella di lease DynamoDB. | 
| initialLeaseTableWriteCapacity | LeaseManagementConfig | La IOPs lettura DynamoDB utilizzata se la Kinesis Client Library deve creare una nuova tabella di lease DynamoDB. | 
| initialPositionInStreamExtended | LeaseManagementConfig | La posizione iniziale nel flusso nella quale l'applicazione dovrebbe iniziare. Questo viene utilizzato soltanto durante la creazione del lease iniziale. | 
| skipShardSyncAtWorkerInitializationIfLeasesExist | CoordinatorConfig | Disabilita la sincronizzazione dei dati shard se la tabella di lease contiene lease esistenti. DA KinesisEco FARE: -438 | 
| shardPrioritization | CoordinatorConfig | Quale prioritizzazione shard utilizzare. | 
| shutdownGraceMillis | N/D | Questa opzione è stata eliminata. Vedi Traslochi MultiLang . | 
| timeoutInSeconds | N/D | Questa opzione è stata eliminata. Vedi MultiLang Rimozioni. | 
| retryGetRecordsInSeconds | PollingConfig | Configura il ritardo tra i GetRecords tentativi di errore. | 
| maxGetRecordsThreadPool | PollingConfig | La dimensione del pool di thread utilizzato per. GetRecords | 
| maxLeaseRenewalThreads | LeaseManagementConfig | Controlla le dimensioni del pool di thread di rinnovo del lease. Quanto maggiori sono i lease che può richiedere l'applicazione, tanto più grande deve essere questo pool. | 
| recordsFetcherFactory | PollingConfig | Consente di sostituire la factory utilizzata per creare fetcher che recuperano dai flussi. | 
| logWarningForTaskAfterMillis | LifecycleConfig | Quanto tempo bisogna attendere prima che venga registrato un avviso se un'attività non è stata completata. | 
| listShardsBackoffTimeInMillis | RetrievalConfig | Il numero di millisecondi di attesa tra le chiamate in ListShards quando si verificano errori. | 
| maxListShardsRetryAttempts | RetrievalConfig | Il numero massimo di volte che ListShards effettua nuovi tentativi prima di desistere. | 

## Rimozione dei tempi di inattività
<a name="idle-time-removal"></a>

Nella versione 1.x di &KCL;, `idleTimeBetweenReadsInMillis` corrispondeva a due quantità: 
+ La quantità di tempo tra controlli dell'attività. È ora possibile configurare questo periodo tra attività impostando `CoordinatorConfig#shardConsumerDispatchPollIntervalMillis`.
+ La quantità di tempo di sospensione quando non è stato restituito alcun record da . Nella versione 2.0, nel fan-out ottimizzato, i record vengono inviati da chi li ha recuperati. L'attività sul consumo di shard avviene solo quando arriva una richiesta di push. 

## Rimozioni della configurazione del client
<a name="client-configuration-removals"></a>

Nella versione 2.0, la KCL non crea più client. Spetta all'utente fornire un client valido. Con questa modifica, tutti i parametri di configurazione che controllavano la creazione del client sono stati rimossi. Se hai bisogno di questi parametri, puoi impostarli sui client prima di fornire i client a `ConfigsBuilder`.


****  

| Campo rimosso | Configurazione equivalente | 
| --- | --- | 
| kinesisEndpoint | Configura SDK KinesisAsyncClient con l'endpoint preferito: KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis endpoint>")).build(). | 
| dynamoDBEndpoint | Configura SDK DynamoDbAsyncClient con l'endpoint preferito: DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb endpoint>")).build(). | 
| kinesisClientConfig | Configura SDK KinesisAsyncClient con la configurazione necessaria: KinesisAsyncClient.builder().overrideConfiguration(<your configuration>).build(). | 
| dynamoDBClientConfig | Configura SDK DynamoDbAsyncClient con la configurazione necessaria: DynamoDbAsyncClient.builder().overrideConfiguration(<your configuration>).build(). | 
| cloudWatchClientConfig | Configura SDK CloudWatchAsyncClient con la configurazione necessaria: CloudWatchAsyncClient.builder().overrideConfiguration(<your configuration>).build(). | 
| regionName | Configura SDK con la regione preferita. Questo è uguale per tutti i client SDK. Ad esempio, KinesisAsyncClient.builder().region(Region.US\$1WEST\$12).build(). | 