

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Usa la libreria client Kinesis
<a name="kcl"></a>

## Cos'è Kinesis Client Library?
<a name="kcl-library-what-is"></a>

Kinesis Client Library (KCL) è una libreria software Java autonoma progettata per semplificare il processo di consumo ed elaborazione dei dati da Amazon Kinesis Data Streams. KCL gestisce molte delle attività complesse associate all'elaborazione distribuita, permettendo agli sviluppatori di concentrarsi sull'implementazione della logica aziendale per l'elaborazione dei dati. Gestisce attività come il bilanciamento del carico tra più lavoratori, la risposta agli errori dei lavoratori, il checkpoint dei record elaborati e la risposta alle variazioni del numero di shard nel flusso.

KCL viene aggiornato frequentemente per incorporare versioni più recenti delle librerie sottostanti, miglioramenti della sicurezza e correzioni di bug. Ti consigliamo di utilizzare l'ultima versione di KCL per evitare problemi noti e beneficiare di tutti gli ultimi miglioramenti. Per trovare l'ultima versione di KCL, consulta [KCL](https://github.com/awslabs/amazon-kinesis-client) Github. 

**Importante**  
Ti consigliamo di utilizzare l'ultima versione di KCL per evitare bug e problemi noti. Se utilizzi KCL 2.6.0 o una versione precedente, esegui l'upgrade a KCL 2.6.1 o versione successiva per evitare una rara condizione che può bloccare l'elaborazione degli shard quando la capacità del flusso cambia. 
KCL è una libreria Java. Il supporto per linguaggi diversi da Java viene fornito utilizzando un demone basato su Java chiamato. MultiLangDaemon MultiLangDaemoninteragisce con l'applicazione KCL tramite STDIN e STDOUT. Per ulteriori informazioni su on, vedere. MultiLangDaemon GitHub [Sviluppa i consumatori con KCL in linguaggi non Java](develop-kcl-consumers-non-java.md)
Non utilizzare le AWS SDK per Java versioni da 2.27.19 a 2.27.23 con KCL 3.x. Queste versioni includono un problema che causa un errore di eccezione relativo all'utilizzo di DynamoDB da parte di KCL. Si consiglia di utilizzare la AWS SDK per Java versione 2.28.0 o successiva per evitare questo problema. 

## Caratteristiche e vantaggi principali di KCL
<a name="kcl-benefits"></a>

Di seguito sono riportate le caratteristiche principali e i relativi vantaggi del KCL:
+ **Scalabilità**: KCL consente alle applicazioni di scalare dinamicamente distribuendo il carico di elaborazione tra più lavoratori. È possibile scalare l'applicazione verso l'interno o verso l'esterno, manualmente o con l'auto-scaling, senza preoccuparsi della ridistribuzione del carico.
+ **Bilanciamento del carico**: KCL bilancia automaticamente il carico di elaborazione tra i lavoratori disponibili, garantendo una distribuzione uniforme del lavoro tra i lavoratori.
+ **Checkpointing**: KCL gestisce il checkpoint dei record elaborati, consentendo alle applicazioni di riprendere l'elaborazione dall'ultima posizione elaborata con successo.
+ **Tolleranza ai guasti**: KCL fornisce meccanismi integrati di tolleranza agli errori, che assicurano che l'elaborazione dei dati continui anche in caso di fallimento dei singoli lavoratori. KCL fornisce at-least-once anche la consegna.
+ **Gestione delle modifiche a livello di flusso**: KCL si adatta alle divisioni e alle unioni degli shard che potrebbero verificarsi a causa di variazioni nel volume dei dati. Mantiene l'ordine assicurandosi che i frammenti secondari vengano elaborati solo dopo che lo shard principale è stato completato e controllato.
+ **Monitoraggio**: KCL si integra con Amazon CloudWatch per il monitoraggio a livello di consumatore.
+ Supporto **multilingue: KCL supporta** nativamente Java e consente di utilizzare più linguaggi di programmazione non Java. MultiLangDaemon

# Concetti KCL
<a name="kcl-concepts"></a>

Questa sezione spiega i concetti e le interazioni principali di Kinesis Client Library (KCL). Questi concetti sono fondamentali per lo sviluppo e la gestione delle applicazioni consumer KCL.
+ **Applicazione consumer KCL: un'applicazione** personalizzata progettata per leggere ed elaborare i record dai flussi di dati Kinesis utilizzando la Kinesis Client Library.
+ **Worker**: le applicazioni consumer KCL sono generalmente distribuite, con uno o più lavoratori in esecuzione contemporaneamente. KCL coordina i lavoratori affinché utilizzino i dati del flusso in modo distribuito e bilancia il carico in modo uniforme tra più lavoratori.
+ **Scheduler**: una classe di alto livello utilizzata da un lavoratore KCL per iniziare l'elaborazione dei dati. Ogni lavoratore KCL ha uno scheduler. Lo scheduler inizializza e supervisiona varie attività, tra cui la sincronizzazione delle informazioni sugli shard dai flussi di dati Kinesis, il monitoraggio delle assegnazioni degli shard tra i lavoratori e l'elaborazione dei dati dal flusso in base agli shard assegnati al lavoratore. Scheduler può adottare varie configurazioni che influiscono sul comportamento dello scheduler, come il nome del flusso da elaborare e le credenziali. AWS Scheduler avvia la consegna dei record di dati dallo stream ai processori di record.
+ **Processore di record**: definisce la logica con cui l'applicazione consumer KCL elabora i dati che riceve dai flussi di dati. È necessario implementare la propria logica di elaborazione dei dati personalizzata nel processore di registrazione. Un lavoratore KCL crea un'istanza di uno scheduler. Lo scheduler crea quindi un'istanza di un processore di record per ogni shard a cui appartiene un contratto di locazione. Un lavoratore può eseguire più processori di record.
+ **Leasing**: definisce l'assegnazione tra un lavoratore e uno shard. Le applicazioni consumer di KCL utilizzano i contratti di locazione per distribuire l'elaborazione dei record di dati tra più lavoratori. Ogni frammento è vincolato a un solo lavoratore mediante un contratto di locazione alla volta e ogni lavoratore può detenere uno o più contratti di locazione contemporaneamente. Quando un lavoratore smette di essere titolare di un contratto di locazione a causa di un arresto o di un fallimento, KCL incarica un altro lavoratore di accettare il contratto di locazione. [Per ulteriori informazioni sul contratto di locazione, consulta la documentazione di Github: Lease Lifecycle.](https://github.com/awslabs/amazon-kinesis-client/blob/master/docs/lease-lifecycle.md#lease-lifecycle)
+ Tabella di **leasing: è una tabella** Amazon DynamoDB unica utilizzata per tenere traccia di tutti i leasing per l'applicazione consumer KCL. Ogni applicazione consumer KCL crea la propria tabella di leasing. La tabella dei contratti di locazione viene utilizzata per mantenere lo stato tra tutti i lavoratori e coordinare l'elaborazione dei dati. Per ulteriori informazioni, consulta [Tabelle di metadati DynamoDB e bilanciamento del carico in KCL](kcl-dynamoDB.md).
+ **Checkpointing**: è il processo di memorizzazione persistente della posizione dell'ultimo record elaborato con successo in uno shard. KCL gestisce il checkpoint per garantire che l'elaborazione possa essere ripresa dall'ultima posizione di checkpoint se un lavoratore fallisce o l'applicazione si riavvia. I checkpoint vengono archiviati nella tabella di lease di DynamoDB come parte dei metadati del leasing. Ciò consente ai lavoratori di continuare l'elaborazione dal punto in cui si era fermato il lavoratore precedente.

# Tabelle di metadati DynamoDB e bilanciamento del carico in KCL
<a name="kcl-dynamoDB"></a>

KCL gestisce i metadati dei lavoratori, come i leasing e le metriche di utilizzo della CPU. KCL tiene traccia di questi metadati utilizzando le tabelle DynamoDB. Per ogni applicazione Amazon Kinesis Data Streams, KCL crea tre tabelle DynamoDB per gestire i metadati: lease table, worker metrics table e coordinator state table.

**Nota**  
**KCL 3.x ha introdotto due nuove tabelle di metadati: le metriche dei lavoratori e le tabelle degli stati dei coordinatori.**

**Importante**  
 È necessario aggiungere le autorizzazioni appropriate per le applicazioni KCL per creare e gestire tabelle di metadati in DynamoDB. Per informazioni dettagliate, vedi [Autorizzazioni IAM richieste per le applicazioni consumer KCL](kcl-iam-permissions.md).  
L'applicazione consumer KCL non rimuove automaticamente queste tre tabelle di metadati DynamoDB. Assicurati di rimuovere queste tabelle di metadati DynamoDB create dall'applicazione consumer KCL quando disattivi l'applicazione consumer per evitare costi inutili.

## Tabella di leasing
<a name="kcl-leasetable"></a>

Una tabella di lease è una tabella Amazon DynamoDB unica utilizzata per tenere traccia degli shard presi in leasing ed elaborati dagli scheduler dell'applicazione consumer KCL. Ogni applicazione consumer KCL crea la propria tabella di leasing. Per impostazione predefinita, KCL utilizza il nome dell'applicazione consumer per il nome della tabella di leasing. È possibile impostare un nome di tabella personalizzato utilizzando la configurazione. KCL crea anche un [indice secondario globale](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GSI.html) sulla tabella di leasing con la chiave di partizione di LeaseOwner per un'efficiente individuazione del leasing. L'indice secondario globale rispecchia l'attributo LeaseKey della tabella di leasing di base. Se la tabella di leasing per l'applicazione consumer KCL non esiste all'avvio dell'applicazione, uno dei lavoratori crea la tabella di leasing per l'applicazione.

È possibile visualizzare la tabella di lease utilizzando la [console Amazon DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ConsoleDynamoDB.html) mentre l'applicazione è in esecuzione.

**Importante**  
Il nome di ogni applicazione consumer KCL deve essere univoco per evitare la duplicazione del nome della tabella di leasing. 
Il tuo account sarà addebitato per i costi associati alla tabella DynamoDB, oltre ai costi associati al flusso di dati Kinesis stesso. 

Ogni riga della tabella di leasing rappresenta uno shard che viene elaborato dagli scheduler dell'applicazione consumer. I campi chiave includono quanto segue:
+ **LeaseKey**: per l'elaborazione a flusso singolo, questo è l'ID dello shard. Per l'elaborazione multi-stream con KCL, è strutturata come. `account-id:StreamName:streamCreationTimestamp:ShardId` leaseKey è la chiave di partizione della tabella di leasing. Per ulteriori informazioni sull'elaborazione multi-stream, consulta. [Elaborazione multi-stream con KCL](kcl-multi-stream.md)
+ **checkpoint:** il numero di sequenza di checkpoint più recente per lo shard. 
+ **checkpointSubSequenceNumero:** quando si utilizza la funzione di aggregazione della Kinesis Producer Library, si tratta di un'estensione del **checkpoint** che tiene traccia dei record dei singoli utenti all'interno del record Kinesis.
+ **LeaseCounter**: utilizzato per verificare se un lavoratore sta attualmente elaborando attivamente il contratto di locazione. LeaseCounter aumenta se la proprietà del leasing viene trasferita a un altro lavoratore.
+ **LeaseOwner**: il lavoratore attuale che detiene questo contratto di locazione.
+ **ownerSwitchesSinceCheckpoint:** quante volte questo contratto di locazione ha cambiato lavoratori dall'ultimo checkpoint.
+ **parentShardId:** ID del genitore di questo shard. Si assicura che lo shard principale sia completamente elaborato prima che inizi l'elaborazione sugli shard secondari, mantenendo l'ordine di elaborazione dei record corretto.
+ **childShardId:** Elenco degli shard secondari IDs risultanti dalla divisione o dall'unione di questo shard. Utilizzato per tracciare la derivazione degli shard e gestire l'ordine di elaborazione durante le operazioni di resharding.
+ **startingHashKey:** Il limite inferiore dell'intervallo di chiavi hash per questo shard.
+ **endingHashKey:** Il limite superiore dell'intervallo di chiavi hash per questo shard.

Se utilizzi l'elaborazione multi-stream con KCL, nella tabella di leasing vengono visualizzati i due campi aggiuntivi seguenti. Per ulteriori informazioni, consulta [Elaborazione multi-stream con KCL](kcl-multi-stream.md).
+ **shardID:** l'ID della partizione.
+ **streamName:** l'identificatore del flusso di dati nel seguente formato:. `account-id:StreamName:streamCreationTimestamp`

## Tabella delle metriche dei lavoratori
<a name="kcl-worker-metrics-table"></a>

La tabella Worker Metrics è una tabella Amazon DynamoDB unica per ogni applicazione KCL e viene utilizzata per registrare i parametri di utilizzo della CPU di ogni lavoratore. Queste metriche verranno utilizzate da KCL per eseguire assegnazioni di leasing efficienti al fine di garantire un utilizzo equilibrato delle risorse tra i lavoratori. Per impostazione predefinita, KCL utilizza `KCLApplicationName-WorkerMetricStats` per il nome della tabella delle metriche dei lavoratori.

## Tabella degli stati del coordinatore
<a name="kcl-coordinator-state-table"></a>

Una tabella di stato del coordinatore è una tabella Amazon DynamoDB unica per ogni applicazione KCL e viene utilizzata per memorizzare informazioni sullo stato interno per i lavoratori. Ad esempio, la tabella degli stati del coordinatore memorizza i dati relativi all'elezione del leader o i metadati associati alla migrazione in atto da KCL 2.x a KCL 3.x. Per impostazione predefinita, KCL utilizza `KCLApplicationName-CoordinatorState` per il nome del coordinatore la tabella degli stati.

## Modalità di capacità DynamoDB per le tabelle di metadati create da KCL
<a name="kcl-capacity-mode"></a>

[Per impostazione predefinita, la Kinesis Client Library (KCL) crea tabelle di metadati DynamoDB come la tabella di lease, la tabella delle metriche dei lavoratori e la tabella dello stato del coordinatore utilizzando la modalità di capacità su richiesta.](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/on-demand-capacity-mode.html) Questa modalità ridimensiona automaticamente la capacità di lettura e scrittura per adattarsi al traffico senza richiedere la pianificazione della capacità. Ti consigliamo vivamente di mantenere la modalità di capacità come modalità on-demand per un funzionamento più efficiente di queste tabelle di metadati.

Se decidi di passare dalla tabella di leasing alla [modalità di capacità assegnata](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/provisioned-capacity-mode.html), segui queste best practice:
+ Analizza i modelli di utilizzo:
  + Monitora i modelli e gli utilizzi di lettura e scrittura della tua applicazione (RCU, WCU) utilizzando i parametri di Amazon. CloudWatch 
  + Comprendi i requisiti di velocità di picco e media.
+ Calcola la capacità richiesta:
  + Stima le unità di capacità di lettura (RCUs) e le unità di capacità di scrittura (WCUs) in base all'analisi.
  + Prendi in considerazione fattori come il numero di frammenti, la frequenza dei checkpoint e il numero di lavoratori.
+ Implementa il ridimensionamento automatico:
  + Utilizza la scalabilità [automatica di DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/provisioned-capacity-mode.html#ddb-autoscaling) per regolare automaticamente la capacità fornita e impostare limiti di capacità minimi e massimi appropriati. 
  + La scalabilità automatica di DynamoDB ti aiuterà a evitare che la tabella di metadati KCL raggiunga il limite di capacità e venga limitata.
+ Monitoraggio e ottimizzazione regolari:
  + Monitora continuamente le CloudWatch metriche per`ThrottledRequests`.
  + Regola la capacità man mano che il carico di lavoro cambia nel tempo.

Se riscontri un problema `ProvisionedThroughputExceededException` nelle tabelle DynamoDB dei metadati per la tua applicazione consumer KCL, devi aumentare la capacità di throughput assegnata della tabella DynamoDB. Se imposti un certo livello di unità di capacità di lettura (RCU) e di unità di capacità di scrittura (WCU) quando crei per la prima volta l'applicazione consumer, potrebbe non essere sufficiente man mano che l'utilizzo aumenta. Ad esempio, se la tua applicazione consumer KCL esegue checkpoint frequenti o funziona su uno stream con molti shard, potresti aver bisogno di più unità di capacità. Per informazioni sulla velocità effettiva assegnata in DynamoDB, consulta la [capacità di throughput di DynamoDB e l'[aggiornamento](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithTables.Basics.html#WorkingWithTables.Basics.UpdateTable) di una tabella nella Amazon DynamoDB Developer](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/capacity-mode.html) Guide.

## In che modo KCL assegna i contratti di locazione ai lavoratori e bilancia il carico
<a name="kcl-assign-leases"></a>

KCL raccoglie e monitora continuamente i parametri di utilizzo della CPU dagli host di elaborazione che eseguono i worker per garantire una distribuzione uniforme del carico di lavoro. Queste metriche di utilizzo della CPU sono memorizzate nella tabella delle metriche dei lavoratori in DynamoDB. Se KCL rileva che alcuni lavoratori mostrano tassi di utilizzo della CPU più elevati rispetto ad altri, riassegnerà i contratti di locazione tra i lavoratori per ridurre il carico sui lavoratori più utilizzati. L'obiettivo è bilanciare il carico di lavoro in modo più uniforme su tutto il parco di applicazioni consumer, evitando che ogni singolo lavoratore si sovraccarichi. Poiché KCL distribuisce l'utilizzo della CPU in tutto il parco di applicazioni consumer, è possibile dimensionare correttamente la capacità del parco di applicazioni consumer scegliendo il numero giusto di dipendenti o utilizzare la scalabilità automatica per gestire in modo efficiente la capacità di elaborazione e ottenere costi inferiori.

**Importante**  
KCL può raccogliere i parametri di utilizzo della CPU dai lavoratori solo se vengono soddisfatti determinati prerequisiti. Per informazioni dettagliate, vedi [Prerequisiti](develop-kcl-consumers-java.md#develop-kcl-consumers-java-prerequisites). Se KCL non è in grado di raccogliere i parametri di utilizzo della CPU dai lavoratori, KCL ricorrerà alla velocità effettiva per lavoratore per assegnare i contratti di locazione e bilanciare il carico tra i lavoratori della flotta. KCL monitorerà la produttività che ogni lavoratore riceve in un determinato momento e riassegnerà i leasing per assicurarsi che ogni lavoratore ottenga un livello di produttività totale simile dai leasing assegnati.

# Sviluppa i consumatori con KCL
<a name="develop-kcl-consumers"></a>

Puoi utilizzare la Kinesis Client Library (KCL) per creare applicazioni consumer che elaborano i dati dai tuoi flussi di dati Kinesis.

KCL è disponibile in più lingue. Questo argomento spiega come sviluppare utenti KCL in linguaggi Java e non Java.
+ [Per visualizzare il riferimento Javadoc di Kinesis Client Library, consulta Amazon Kinesis Client Library Javadoc.](https://javadoc.io/doc/software.amazon.kinesis/amazon-kinesis-client/latest/index.html)
+ Per scaricare KCL per Java da GitHub, consulta [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) for Java.
+ [Per individuare KCL per Java su Apache Maven, consulta il KCL Maven Central Repository.](https://central.sonatype.com/artifact/software.amazon.kinesis/amazon-kinesis-client)

**Topics**
+ [Sviluppa i consumatori con KCL in Java](develop-kcl-consumers-java.md)
+ [Sviluppa i consumatori con KCL in linguaggi non Java](develop-kcl-consumers-non-java.md)

# Sviluppa i consumatori con KCL in Java
<a name="develop-kcl-consumers-java"></a>

## Prerequisiti
<a name="develop-kcl-consumers-java-prerequisites"></a>

Prima di iniziare a utilizzare KCL 3.x, assicurati di disporre di quanto segue:
+ Java Development Kit (JDK) 8 o versione successiva
+ AWS SDK per Java 2.x
+ Maven o Gradle per la gestione delle dipendenze

KCL raccoglie i parametri di utilizzo della CPU, come l'utilizzo della CPU, dall'host di elaborazione su cui lavorano i lavoratori per bilanciare il carico e raggiungere un livello di utilizzo uniforme delle risorse tra i lavoratori. Per consentire a KCL di raccogliere i parametri di utilizzo della CPU dai lavoratori, è necessario soddisfare i seguenti prerequisiti:

 **Amazon Elastic Compute Cloud(Amazon EC2)**
+ Il sistema operativo deve essere Linux OS.
+ È necessario abilitarlo [IMDSv2](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html)nella propria istanza EC2.

 **Amazon Elastic Container Service (Amazon ECS) su Amazon EC2**
+ Il sistema operativo deve essere Linux OS.
+ È necessario abilitare l'endpoint [ECS Task Metadata Endpoint versione 4](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ec2-metadata.html). 
+ La versione dell'agente container Amazon ECS deve essere 1.39.0 o successiva.

 **Amazon ECS su AWS Fargate**
+ È necessario abilitare l'[endpoint dei metadati delle attività Fargate](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4-fargate.html) versione 4. Se si utilizza la versione 1.4.0 o successiva della piattaforma Fargate, questa opzione è abilitata per impostazione predefinita. 
+ Piattaforma Fargate versione 1.4.0 o successiva.

 **Amazon Elastic Kubernetes Service (Amazon EKS) su Amazon EC2** 
+ Il sistema operativo deve essere Linux OS.

 **Amazon EKS su AWS Fargate**
+ Piattaforma Fargate 1.3.0 o versione successiva.

**Importante**  
Se KCL non è in grado di raccogliere i parametri di utilizzo della CPU dai lavoratori, ricorrerà alla velocità effettiva per lavoratore per assegnare i contratti di locazione e bilanciare il carico tra i lavoratori della flotta. Per ulteriori informazioni, consulta [In che modo KCL assegna i contratti di locazione ai lavoratori e bilancia il carico](kcl-dynamoDB.md#kcl-assign-leases).

## Installa e aggiungi dipendenze
<a name="develop-kcl-consumers-java-installation"></a>

Se usi Maven, aggiungi la seguente dipendenza al tuo file. `pom.xml` Assicurati di aver sostituito 3.x.x con l'ultima versione di KCL. 

```
<dependency>
    <groupId>software.amazon.kinesis</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>3.x.x</version> <!-- Use the latest version -->
</dependency>
```

Se stai usando Gradle, aggiungi quanto segue al tuo file. `build.gradle` Assicurati di aver sostituito 3.x.x con l'ultima versione di KCL. 

```
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
```

[Puoi verificare la versione più recente di KCL sul Maven Central Repository.](https://search.maven.org/artifact/software.amazon.kinesis/amazon-kinesis-client)

## Implementa il consumatore
<a name="develop-kcl-consumers-java-implemetation"></a>

Un'applicazione consumer KCL è composta dai seguenti componenti chiave:

**Topics**
+ [RecordProcessor](#implementation-recordprocessor)
+ [RecordProcessorFactory](#implementation-recordprocessorfactory)
+ [Pianificatore](#implementation-scheduler)
+ [Applicazione principale per i consumatori](#implementation-main)

### RecordProcessor
<a name="implementation-recordprocessor"></a>

RecordProcessor è il componente principale in cui risiede la logica aziendale per l'elaborazione dei record del flusso di dati Kinesis. Definisce in che modo l'applicazione elabora i dati che riceve dal flusso Kinesis.

Responsabilità chiave:
+ Inizializza l'elaborazione per uno shard
+ Elabora batch di record dal flusso Kinesis
+ Arresta l'elaborazione di uno shard (ad esempio, quando lo shard si divide o si fonde o il leasing viene ceduto a un altro host)
+ Gestisci i checkpoint per monitorare i progressi

Di seguito viene illustrato un esempio di implementazione:

```
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.*;
import software.amazon.kinesis.processor.ShardRecordProcessor;

public class SampleRecordProcessor implements ShardRecordProcessor {
    private static final String SHARD_ID_MDC_KEY = "ShardId";
    private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);
    private String shardId;

    @Override
    public void initialize(InitializationInput initializationInput) {
        shardId = initializationInput.shardId();
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Processing {} record(s)", processRecordsInput.records().size());
            processRecordsInput.records().forEach(r -> 
                log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber())
            );
            
            // Checkpoint periodically
            processRecordsInput.checkpointer().checkpoint();
        } catch (Throwable t) {
            log.error("Caught throwable while processing records. Aborting.", t);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Lost lease, so terminating.");
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Reached shard end checkpointing.");
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Exception while checkpointing at shard end. Giving up.", e);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Scheduler is shutting down, checkpointing.");
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }
}
```

Di seguito è riportata una spiegazione dettagliata di ogni metodo utilizzato nell'esempio:

**inizializza (InitializationInputinitializationInput)**
+ Scopo: impostare le risorse o lo stato necessari per l'elaborazione dei record.
+ Quando viene chiamato: una volta, quando KCL assegna uno shard a questo registratore.
+ Punti chiave:
  + `initializationInput.shardId()`: L'ID dello shard che questo processore gestirà.
  + `initializationInput.extendedSequenceNumber()`: Il numero di sequenza da cui iniziare l'elaborazione.

**ProcessRecords () ProcessRecordsInput processRecordsInput**
+ Scopo: Elaborare i record in entrata e, facoltativamente, controllare l'avanzamento del checkpoint.
+ Quando viene chiamato: ripetutamente, purché il processore di dischi detenga il contratto di locazione dello shard.
+ Punti chiave:
  + `processRecordsInput.records()`: Elenco dei record da elaborare.
  + `processRecordsInput.checkpointer()`: utilizzato per controllare lo stato di avanzamento.
  + Assicurati di aver gestito tutte le eccezioni durante l'elaborazione per evitare che KCL fallisca.
  + Questo metodo dovrebbe essere idempotente, poiché lo stesso record può essere elaborato più di una volta in alcuni scenari, ad esempio dati che non sono stati sottoposti a checkpoint prima che un lavoratore si blocchi o riavvii imprevisti.
  + Svuota sempre tutti i dati memorizzati nel buffer prima del checkpoint per garantire la coerenza dei dati.

**LeaseLostInput leaseLostInputleaseLost ()**
+ Scopo: ripulire tutte le risorse specifiche per l'elaborazione di questo frammento.
+ Quando viene chiamato: quando un altro Scheduler assume il contratto di locazione di questo shard.
+ Punti chiave:
  + Il checkpoint non è consentito con questo metodo.

**shardEnded () ShardEndedInput shardEndedInput**
+ Scopo: completare l'elaborazione di questo frammento e di questo checkpoint.
+ Quando viene chiamato: quando lo shard si divide o si fonde, indica che tutti i dati dello shard sono stati elaborati.
+ Punti chiave:
  + `shardEndedInput.checkpointer()`: utilizzato per eseguire il checkpoint finale.
  + Il checkpoint in questo metodo è obbligatorio per completare l'elaborazione.
  + Il mancato ripristino dei dati e del checkpoint in questo campo può comportare la perdita dei dati o un'elaborazione duplicata alla riapertura dello shard.

**ShutdownRequestedInput shutdownRequestedInputShutdownRequested ()**
+ Scopo: controllare e ripulire le risorse quando KCL si spegne.
+ Quando viene chiamato: quando KCL si spegne, ad esempio quando l'applicazione viene chiusa).
+ Punti chiave:
  + `shutdownRequestedInput.checkpointer()`: utilizzato per eseguire il checkpoint prima dello spegnimento.
  + Assicurati di aver implementato il checkpoint nel metodo in modo che i progressi vengano salvati prima dell'interruzione dell'applicazione.
  + Il mancato ripristino dei dati e del checkpoint in questo campo può comportare la perdita dei dati o la rielaborazione dei record al riavvio dell'applicazione.

**Importante**  
KCL 3.x garantisce un minor numero di ritrattamenti dei dati quando il contratto di locazione viene ceduto da un lavoratore a un altro lavoratore effettuando un checkpoint prima che il lavoratore precedente venga chiuso. Se non implementate la logica di checkpoint nel `shutdownRequested()` metodo, non vedrete questo vantaggio. Assicurati di aver implementato una logica di checkpoint all'interno del metodo. `shutdownRequested()`

### RecordProcessorFactory
<a name="implementation-recordprocessorfactory"></a>

RecordProcessorFactory è responsabile della creazione di nuove RecordProcessor istanze. KCL utilizza questa factory per crearne una nuova RecordProcessor per ogni shard che l'applicazione deve elaborare.

Responsabilità chiave:
+ Crea nuove RecordProcessor istanze su richiesta
+ Assicurati che ognuna RecordProcessor sia inizializzata correttamente

Di seguito è riportato un esempio di implementazione:

```
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new SampleRecordProcessor();
    }
}
```

In questo esempio, la factory ne crea una nuova SampleRecordProcessor ogni volta che viene chiamata shardRecordProcessor (). È possibile estenderlo per includere qualsiasi logica di inizializzazione necessaria.

### Pianificatore
<a name="implementation-scheduler"></a>

Scheduler è un componente di alto livello che coordina tutte le attività dell'applicazione KCL. È responsabile dell'orchestrazione complessiva dell'elaborazione dei dati.

Responsabilità chiave:
+ Gestire il ciclo di vita di RecordProcessors
+ Gestisci la gestione del leasing per i frammenti
+ Coordina il checkpoint
+ Bilanciate il carico di elaborazione degli shard tra i diversi worker della vostra applicazione
+ Gestisci segnali di spegnimento e terminazione delle applicazioni senza interruzioni

Scheduler viene in genere creato e avviato nell'applicazione principale. È possibile controllare l'esempio di implementazione di Scheduler nella sezione seguente, Main Consumer Application. 

### Applicazione principale per i consumatori
<a name="implementation-main"></a>

L'applicazione principale per i consumatori collega tutti i componenti. È responsabile della configurazione del consumatore KCL, della creazione dei client necessari, della configurazione dello Scheduler e della gestione del ciclo di vita dell'applicazione.

Responsabilità chiave:
+ Configurare i client AWS di servizio (Kinesis, DynamoDB,) CloudWatch
+ Configura l'applicazione KCL
+ Crea e avvia lo Scheduler
+ Gestisci l'arresto dell'applicazione

Di seguito è riportato un esempio di implementazione:

```
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import java.util.UUID;

public class SampleConsumer {
    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    public SampleConsumer(String streamName, Region region) {
        this.streamName = streamName;
        this.region = region;
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    public void run() {
        DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        
        ConfigsBuilder configsBuilder = new ConfigsBuilder(
            streamName, 
            streamName, 
            kinesisClient, 
            dynamoDbAsyncClient,
            cloudWatchClient, 
            UUID.randomUUID().toString(), 
            new SampleRecordProcessorFactory()
        );

        Scheduler scheduler = new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();
    }

    public static void main(String[] args) {
        String streamName = "your-stream-name"; // replace with your stream name
        Region region = Region.US_EAST_1; // replace with your region
        new SampleConsumer(streamName, region).run();
    }
}
```

 Per impostazione predefinita, KCL crea un consumatore Enhanced Fan-out (EFO) con throughput dedicato. Per ulteriori informazioni su Enhanced Fan-out, consulta. [Sviluppa consumatori con fan-out migliorati con un throughput dedicato](enhanced-consumers.md) Se hai meno di 2 consumatori o non hai bisogno di ritardi di propagazione in lettura inferiori a 200 ms, devi impostare la seguente configurazione nell'oggetto scheduler per utilizzare i consumatori a throughput condiviso:

```
configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
```

Il codice seguente è un esempio di creazione di un oggetto scheduler che utilizza consumatori a throughput condiviso:

**Importazioni:**

```
import software.amazon.kinesis.retrieval.polling.PollingConfig;
```

**Codice**:

```
Scheduler scheduler = new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );/
```

# Sviluppa i consumatori con KCL in linguaggi non Java
<a name="develop-kcl-consumers-non-java"></a>

Questa sezione tratta l'implementazione dei consumatori che utilizzano Kinesis Client Library (KCL) in Python, Node.js, .NET e Ruby.

KCL è una libreria Java. Il supporto per linguaggi diversi da Java viene fornito utilizzando un'interfaccia multilingue chiamata. `MultiLangDaemon` Questo demone è basato su Java e viene eseguito in background quando si utilizza un KCL con un linguaggio diverso da Java. Pertanto, se installi KCL per linguaggi non Java e scrivi la tua app consumer interamente in linguaggi non Java, avrai comunque bisogno che Java sia installato sul tuo sistema a causa di. `MultiLangDaemon` Inoltre, `MultiLangDaemon` presenta alcune impostazioni predefinite che potresti dover personalizzare per il tuo caso d'uso (ad esempio, la regione AWS a cui si connette). Per ulteriori informazioni su `MultiLangDaemon` on GitHub, consulta il [ MultiLangDaemon progetto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Sebbene i concetti fondamentali rimangano gli stessi in tutte le lingue, ci sono alcune considerazioni e implementazioni specifiche per ciascuna lingua. Per i concetti fondamentali sullo sviluppo di KCL per i consumatori, vedi. [Sviluppa i consumatori con KCL in Java](develop-kcl-consumers-java.md) Per informazioni più dettagliate su come sviluppare consumatori KCL in Python, Node.js, .NET e Ruby e sugli ultimi aggiornamenti, fai riferimento ai seguenti repository: GitHub 
+ Python: [amazon-kinesis-client-python](https://github.com/awslabs/amazon-kinesis-client-python)
+ Node.js: [amazon-kinesis-client-nodejs](https://github.com/awslabs/amazon-kinesis-client-nodejs)
+ .NET: [amazon-kinesis-client-net](https://github.com/awslabs/amazon-kinesis-client-net)
+ Rubino: [amazon-kinesis-client-ruby](https://github.com/awslabs/amazon-kinesis-client-ruby)

**Importante**  
Non utilizzare le seguenti versioni della libreria KCL non Java se utilizzi JDK 8. Queste versioni contengono una dipendenza (logback) incompatibile con JDK 8.  
KCL Python 3.0.2 e 2.2.0
KCL Node.js 2.3.0
KCL.NET 3.1.0
KCL Ruby 2.2.0
Si consiglia di utilizzare versioni rilasciate prima o dopo queste versioni interessate quando si lavora con JDK 8.

# Elaborazione multi-stream con KCL
<a name="kcl-multi-stream"></a>

Questa sezione descrive le modifiche richieste in KCL che consentono di creare applicazioni consumer KCL in grado di elaborare più di un flusso di dati contemporaneamente.
**Importante**  
L'elaborazione multi-stream è supportata solo in KCL 2.3 o versioni successive.
L'elaborazione multi-stream *non* è supportata per i consumatori KCL scritti in linguaggi non Java che funzionano con. `multilangdaemon`
L'elaborazione multi-stream *non* è supportata in nessuna versione di KCL 1.x.
+ **MultistreamTracker interfaccia**
  + Per creare un'applicazione consumer in grado di elaborare più flussi contemporaneamente, è necessario implementare una nuova interfaccia denominata [MultistreamTracker](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java). Questa interfaccia include il metodo `streamConfigList` che restituisce l'elenco dei flussi di dati e le relative configurazioni che devono essere elaborati dall'applicazione consumer KCL. Si noti che i flussi di dati in fase di elaborazione possono essere modificati durante il runtime dell'applicazione consumer. `streamConfigList`viene chiamato periodicamente da KCL per conoscere le modifiche nei flussi di dati da elaborare.
  + Compila `streamConfigList` l'elenco. [StreamConfig](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java#L23)

  ```
  package software.amazon.kinesis.common;
  
  import lombok.Data;
  import lombok.experimental.Accessors;
  
  @Data
  @Accessors(fluent = true)
  public class StreamConfig {
      private final StreamIdentifier streamIdentifier;
      private final InitialPositionInStreamExtended initialPositionInStreamExtended;
      private String consumerArn;
  }
  ```
  + I campi `StreamIdentifier` e `InitialPositionInStreamExtended` sono obbligatori, mentre `consumerArn` è facoltativo. È necessario fornire i dati `consumerArn` solo se si utilizza KCL per implementare un'applicazione utente fan-out avanzata.
  + Per ulteriori informazioni su`StreamIdentifier`, vedere [https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java](https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L129) \$1L129. Per creare un`StreamIdentifier`, ti consigliamo di creare un'istanza multistream da `streamArn` and the `streamCreationEpoch` disponibile in KCL 2.5.0 o versioni successive. In KCL v2.3 e v2.4, che non supportano`streamArm`, crea un'istanza multistream utilizzando il formato. `account-id:StreamName:streamCreationTimestamp` Questo formato sarà obsoleto e non sarà più supportato a partire dalla prossima versione principale.
  +  MultistreamTracker include anche una strategia per eliminare i leasing di vecchi stream nella tabella dei lease (). formerStreamsLeases DeletionStrategy Si noti che la strategia NON PUÒ essere modificata durante il runtime dell'applicazione consumer. [Per ulteriori informazioni, vedete /blob/0c5042dadf794fe988438436252a5a8fe70b6b0 .java. https://github.com/awslabs/ amazon-kinesis-client b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java)
+   [ConfigsBuilder](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java)è una classe a livello di applicazione che puoi utilizzare per specificare tutte le impostazioni di configurazione KCL da utilizzare durante la creazione dell'applicazione consumer KCL per KCL versione 2.x o successiva. `ConfigsBuilder`la classe ora supporta l'interfaccia. `MultistreamTracker` Puoi inizializzarli ConfigsBuilder entrambi con il nome dell'unico flusso di dati da cui consumare i record da: 

  ```
  /**
       * Constructor to initialize ConfigsBuilder with StreamName
       * @param streamName
       * @param applicationName
       * @param kinesisClient
       * @param dynamoDBClient
       * @param cloudWatchClient
       * @param workerIdentifier
       * @param shardRecordProcessorFactory
       */
      public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName,
              @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
              @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
              @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
          this.appStreamTracker = Either.right(streamName);
          this.applicationName = applicationName;
          this.kinesisClient = kinesisClient;
          this.dynamoDBClient = dynamoDBClient;
          this.cloudWatchClient = cloudWatchClient;
          this.workerIdentifier = workerIdentifier;
          this.shardRecordProcessorFactory = shardRecordProcessorFactory;
      }
  ```  

Oppure puoi inizializzare ConfigsBuilder con `MultiStreamTracker` se desideri implementare un'applicazione consumer KCL che elabora più flussi contemporaneamente.

```
* Constructor to initialize ConfigsBuilder with MultiStreamTracker
     * @param multiStreamTracker
     * @param applicationName
     * @param kinesisClient
     * @param dynamoDBClient
     * @param cloudWatchClient
     * @param workerIdentifier
     * @param shardRecordProcessorFactory
     */
    public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName,
            @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
            @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
            @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
        this.appStreamTracker = Either.left(multiStreamTracker);
        this.applicationName = applicationName;
        this.kinesisClient = kinesisClient;
        this.dynamoDBClient = dynamoDBClient;
        this.cloudWatchClient = cloudWatchClient;
        this.workerIdentifier = workerIdentifier;
        this.shardRecordProcessorFactory = shardRecordProcessorFactory;
    }
```
+ Con il supporto multi-stream implementato per la tua applicazione consumer KCL, ogni riga della tabella di lease dell'applicazione ora contiene l'ID dello shard e il nome del flusso dei molteplici flussi di dati elaborati da questa applicazione.
+ Quando viene implementato il supporto multi-stream per la tua applicazione consumer KCL, LeaseKey assume la seguente struttura:. `account-id:StreamName:streamCreationTimestamp:ShardId` Ad esempio, `111111111:multiStreamTest-1:12345:shardId-000000000336`.

**Importante**  
Quando la tua applicazione consumer KCL esistente è configurata per elaborare solo un flusso di dati, `leaseKey` (che è la chiave di partizione per la tabella di leasing) è lo shard ID. Se riconfigurate un'applicazione consumer KCL esistente per elaborare più flussi di dati, la tabella di leasing si interrompe, perché la `leaseKey` struttura deve essere la seguente: per supportare il multi-stream. `account-id:StreamName:StreamCreationTimestamp:ShardId`

# Usa il registro Schema con KCL AWS Glue
<a name="kcl-glue-schema"></a>

È possibile integrare Kinesis Data Streams AWS Glue con il registro Schema. Il registro AWS Glue Schema consente di scoprire, controllare ed evolvere centralmente gli schemi, garantendo al contempo che i dati prodotti siano continuamente convalidati da uno schema registrato. Uno schema definisce la struttura e il formato di un registro di dati. Uno schema è una specifica con versioni per la pubblicazione, il consumo o l'archiviazione dei dati in modo affidabile. Il registro AWS Glue Schema consente di migliorare la qualità e la governance end-to-end dei dati all'interno delle applicazioni di streaming. Per ulteriori informazioni, consulta [Registro degli schemi di AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). Uno dei modi per configurare questa integrazione è tramite KCL per Java.

**Importante**  
AWS Glue L'integrazione del registro Schema per Kinesis Data Streams è supportata solo in KCL 2.3 o versioni successive.
AWS Glue L'integrazione del registro dello schema per Kinesis Data *Streams* non è supportata per gli utenti KCL scritti in linguaggi non Java che funzionano con. `multilangdaemon`
AWS Glue L'integrazione del registro dello schema per Kinesis Data *Streams* non è supportata in nessuna versione di KCL 1.x.

Per istruzioni dettagliate su come configurare l'integrazione di Kinesis Data Streams con il registro AWS Glue Schema utilizzando KCL, consulta la sezione «Interazione con i dati utilizzando le KPL/KCL librerie» in [Caso d'uso: integrazione di Amazon Kinesis Data Streams](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds) con lo Schema Registry. AWS Glue 

# Autorizzazioni IAM richieste per le applicazioni consumer KCL
<a name="kcl-iam-permissions"></a>

 Devi aggiungere le seguenti autorizzazioni al ruolo o all'utente IAM associato alla tua applicazione consumer KCL. 

 Le migliori pratiche di sicurezza per AWS imporre l'uso di autorizzazioni granulari per controllare l'accesso a diverse risorse. AWS Identity and Access Management (IAM) consente di gestire gli utenti e le autorizzazioni degli utenti in. AWS Una policy IAM elenca in modo esplicito le operazioni consentite e le risorse per le quali sono applicabili le operazioni.

La tabella seguente mostra le autorizzazioni IAM minime generalmente richieste per le applicazioni consumer KCL:


**Autorizzazioni IAM minime per le applicazioni consumer KCL**  

| Servizio | Azioni | Risorse () ARNs | Scopo | 
| --- | --- | --- | --- | 
| Flusso di dati Amazon Kinesis |  `DescribeStream` `DescribeStreamSummary` `RegisterStreamConsumer`  |  Flusso di dati Kinesis da cui l'applicazione KCL elaborerà i dati.`arn:aws:kinesis:region:account:stream/StreamName`  |  Prima di leggere i record, il consumer verifica se il flusso di dati esiste, è attivo e contiene il flusso di dati. Registra i consumatori su uno shard.  | 
| Flusso di dati Amazon Kinesis |  `GetRecords` `GetShardIterator` `ListShards`  | Flusso di dati Kinesis da cui l'applicazione KCL elaborerà i dati.`arn:aws:kinesis:region:account:stream/StreamName` |  Legge record da uno shard.  | 
| Flusso di dati Amazon Kinesis |  `SubscribeToShard` `DescribeStreamConsumer` |  Flusso di dati Kinesis da cui l'applicazione KCL elaborerà i dati. Aggiungi questa azione solo se utilizzi utenti Enhanced Fan-Out (EFO). `arn:aws:kinesis:region:account:stream/StreamName/consumer/*`  |  Si abbona a uno shard per consumatori Enhanced Fan-out (EFO).  | 
| Amazon DynamoDB |  `CreateTable` `DescribeTable` `UpdateTable` `Scan` `GetItem` `PutItem` `UpdateItem` `DeleteItem`  |  Tabella di leasing (tabella dei metadati) in DynamoDB creata da KCL. `arn:aws:dynamodb:region:account:table/KCLApplicationName`  |  Queste azioni sono necessarie per consentire a KCL di gestire la tabella di lease creata in DynamoDB.  | 
| Amazon DynamoDB |  `CreateTable` `DescribeTable` `Scan` `GetItem` `PutItem` `UpdateItem` `DeleteItem`  |  Metriche dei lavoratori e tabella dello stato del coordinatore (tabelle di metadati in DynamoDB) creata da KCL. `arn:aws:dynamodb:region:account:table/KCLApplicationName-WorkerMetricStats` `arn:aws:dynamodb:region:account:table/KCLApplicationName-CoordinatorState`  |  Queste azioni sono necessarie per consentire a KCL di gestire le metriche dei lavoratori e le tabelle dei metadati dello stato del coordinatore in DynamoDB.  | 
| Amazon DynamoDB | `Query` |  Indice secondario globale nella tabella delle locazioni. `arn:aws:dynamodb:region:account:table/KCLApplicationName/index/*`  |  Questa azione è necessaria affinché KCL legga l'indice secondario globale della tabella di lease creata in DynamoDB.  | 
| Amazon CloudWatch | `PutMetricData` |  \$1  |  Carica metriche utili per monitorare l'applicazione. CloudWatch L'asterisco (\$1) viene utilizzato perché non esiste una risorsa specifica in CloudWatch cui viene richiamata l'azione. `PutMetricData`   | 

**Nota**  
Sostituisci «region», «account»StreamName, "» e "KCLApplicationName» ARNs con il tuo Account AWS numero Regione AWS, il nome del flusso di dati Kinesis e il nome dell'applicazione KCL rispettivamente. KCL 3.x crea altre due tabelle di metadati in DynamoDB. Per informazioni dettagliate sulle tabelle di metadati DynamoDB create da KCL, vedere. [Tabelle di metadati DynamoDB e bilanciamento del carico in KCL](kcl-dynamoDB.md) Se utilizzi configurazioni per personalizzare i nomi delle tabelle di metadati create da KCL, usa i nomi di tabella specificati anziché il nome dell'applicazione KCL. 

Di seguito è riportato un esempio di documento politico per un'applicazione consumer KCL. 

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer",
                "kinesis:GetRecords",
                "kinesis:GetShardIterator",
                "kinesis:ListShards"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/STREAM_NAME"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:SubscribeToShard",
                "kinesis:DescribeStreamConsumer"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/STREAM_NAME/consumer/*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:CreateTable",
                "dynamodb:DescribeTable",
                "dynamodb:UpdateTable",
                "dynamodb:GetItem",
                "dynamodb:UpdateItem",
                "dynamodb:PutItem",
                "dynamodb:DeleteItem",
                "dynamodb:Scan"
            ],
            "Resource": [
            "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:CreateTable",
                "dynamodb:DescribeTable",
                "dynamodb:GetItem",
                "dynamodb:UpdateItem",
                "dynamodb:PutItem",
                "dynamodb:DeleteItem",
                "dynamodb:Scan"
            ],
            "Resource": [
            "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME-WorkerMetricStats",
    "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME-CoordinatorState"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:Query"
            ],
            "Resource": [
            "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME/index/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": "*"
        }
    ]
}
```

------

Prima di utilizzare questa politica di esempio, controlla i seguenti elementi:
+ Sostituisci REGION con il tuo Regione AWS (ad esempio, us-east-1).
+ Sostituisci ACCOUNT\$1ID con il tuo ID. Account AWS 
+ Sostituisci STREAM\$1NAME con il nome del tuo flusso di dati Kinesis.
+ Sostituisci CONSUMER\$1NAME con il nome del tuo consumatore, in genere il nome dell'applicazione quando usi KCL.
+ Sostituisci KCL\$1APPLICATION\$1NAME con il nome della tua applicazione KCL.

# Configurazioni KCL
<a name="kcl-configuration"></a>

Puoi impostare le proprietà di configurazione per personalizzare la funzionalità di Kinesis Client Library per soddisfare i tuoi requisiti specifici. La tabella seguente descrive le proprietà e le classi di configurazione.

**Importante**  
In KCL 3.x, l'algoritmo di bilanciamento del carico mira a ottenere un utilizzo uniforme della CPU tra i lavoratori, non un numero uguale di leasing per lavoratore. Se impostato su un valore `maxLeasesForWorker` troppo basso, potresti limitare la capacità di KCL di bilanciare efficacemente il carico di lavoro. Se utilizzi la `maxLeasesForWorker` configurazione, valuta la possibilità di aumentarne il valore per consentire la migliore distribuzione del carico possibile.


**Questa tabella mostra le proprietà di configurazione per KCL**  

| Proprietà di configurazione | Classe di configurazione | Description | Valore predefinito | 
| --- | --- | --- | --- | 
| applicationName | ConfigsBuilder | Il nome per l'applicazione della KCL. Utilizzato come predefinito per tableName e consumerName. | Non applicabile | 
| tableName | ConfigsBuilder |  Consente di ignorare il nome della tabella utilizzato per la tabella di lease di Amazon DynamoDB.  | Non applicabile | 
| streamName | ConfigsBuilder |  Il nome del flusso dal quale l'applicazione elabora i record.  | Non applicabile | 
| workerIdentifier | ConfigsBuilder |  Un identificatore univoco che rappresenta la creazione dell'elaboratore di applicazione. Deve essere univoco.  | Non applicabile | 
| failoverTimeMillis | LeaseManagementConfig |  Il numero di millisecondi che devono passare prima di poter considerare un proprietario di lease come fallito. Per le applicazioni con un numero elevato di shard, questo può essere impostato su un numero più alto per ridurre il numero di IOPS DynamoDB necessari per il tracciamento dei leasing.  | 10.000 (10 secondi) | 
| shardSyncIntervalMillis | LeaseManagementConfig |  Il periodo di tempo tra le chiamate di sincronizzazione dello shard.  | 60.000 (60 secondi) | 
| cleanupLeasesUponShardCompletion | LeaseManagementConfig |  Quando impostati, i lease vengono rimossi non appena i lease figlio hanno iniziato l'elaborazione.  | TRUE | 
| ignoreUnexpectedChildShards | LeaseManagementConfig |  Quando impostato, i shard figlio che hanno un shard aperto vengono ignorati. Questo è principalmente per DynamoDB Streams.  | FALSE | 
| maxLeasesForWorker | LeaseManagementConfig |  Il numero massimo di contratti di locazione che un singolo lavoratore deve accettare. Un valore troppo basso può causare la perdita di dati se i lavoratori non sono in grado di elaborare tutti gli shard e portare a un'assegnazione del leasing non ottimale tra i lavoratori. Al momento della configurazione, tenete conto del numero totale di shard, del numero di lavoratori e della capacità di elaborazione dei worker.  | Illimitato | 
| maxLeaseRenewalThreads | LeaseManagementConfig |  Controlla le dimensioni del pool di thread di rinnovo del lease. Quanto maggiori sono i lease che può richiedere l'applicazione, tanto più grande deve essere questo pool.  | 20 | 
| billingMode | LeaseManagementConfig |  Determina la modalità di capacità della tabella di leasing creata in DynamoDB. Sono disponibili due opzioni: modalità on-demand (PAY\$1PER\$1REQUEST) e modalità provisioned. Si consiglia di utilizzare l'impostazione predefinita della modalità on demand, poiché si ridimensiona automaticamente per adattarsi al carico di lavoro senza la necessità di pianificare la capacità.  | PAY\$1PER\$1REQUEST (modalità su richiesta) | 
| initialLeaseTableReadCapacity | LeaseManagementConfig | La capacità di lettura di DynamoDB utilizzata se la Kinesis Client Library deve creare una nuova tabella di lease DynamoDB con modalità di capacità fornita. È possibile ignorare questa configurazione se si utilizza la modalità di capacità on-demand predefinita nella configurazione. billingMode | 10 | 
| initialLeaseTableWriteCapacity | LeaseManagementConfig | La capacità di lettura di DynamoDB utilizzata se la Kinesis Client Library deve creare una nuova tabella di lease DynamoDB. È possibile ignorare questa configurazione se si utilizza la modalità di capacità on-demand predefinita nella configurazione. billingMode | 10 | 
| initialPositionInStreamExtended | LeaseManagementConfig |  La posizione iniziale nel flusso nella quale l'applicazione dovrebbe iniziare. Questo viene utilizzato soltanto durante la creazione del lease iniziale.  |  InitialPositionInStream.TRIM\$1HORIZON  | 
| reBalanceThresholdPercentage | LeaseManagementConfig |  Un valore percentuale che determina quando l'algoritmo di bilanciamento del carico deve prendere in considerazione la riassegnazione degli shard tra i lavoratori. Questa è una nuova configurazione introdotta in KCL 3.x.  | 10 | 
| dampeningPercentage | LeaseManagementConfig |  Un valore percentuale utilizzato per attenuare la quantità di carico che verrà spostata dal lavoratore sovraccarico in un'unica operazione di ribilanciamento. Questa è una nuova configurazione introdotta in KCL 3.x.  | 60 | 
| allowThroughputOvershoot | LeaseManagementConfig |  Determina se è ancora necessario richiedere un leasing aggiuntivo al lavoratore sovraccarico, anche se ciò fa sì che la quantità totale della produzione di leasing utilizzata superi la quantità di produzione desiderata. Questa è una nuova configurazione introdotta in KCL 3.x.  | TRUE | 
| disableWorkerMetrics | LeaseManagementConfig |  Determina se KCL deve ignorare le metriche relative alle risorse fornite dai lavoratori (come l'utilizzo della CPU) durante la riassegnazione dei leasing e il bilanciamento del carico. Impostalo su TRUE se vuoi impedire a KCL di bilanciare il carico in base all'utilizzo della CPU. Questa è una nuova configurazione introdotta in KCL 3.x.  | FALSE | 
| maxThroughputPerHostKBps | LeaseManagementConfig |  Importo della produttività massima da assegnare a un lavoratore durante l'assegnazione del leasing. Questa è una nuova configurazione introdotta in KCL 3.x.  | Illimitato | 
| isGracefulLeaseHandoffEnabled | LeaseManagementConfig |  Controlla il comportamento della cessione del contratto di locazione tra i lavoratori. Se impostato su true, KCL tenterà di trasferire i contratti di locazione in modo corretto concedendo allo shard il tempo RecordProcessor sufficiente per completare l'elaborazione prima di cedere il contratto di locazione a un altro lavoratore. Ciò può contribuire a garantire l'integrità dei dati e le transizioni fluide, ma può aumentare i tempi di consegna. Se impostato su false, il contratto di locazione verrà ceduto immediatamente senza attendere che RecordProcessor venga chiuso correttamente. Ciò può portare a consegne più rapide, ma può comportare il rischio di un'elaborazione incompleta. Nota: il checkpointing deve essere implementato all'interno del metodo shutdownRequested () di RecordProcessor per trarre vantaggio dalla funzionalità graceful lease handoff. Questa è una nuova configurazione introdotta in KCL 3.x.  | TRUE | 
| gracefulLeaseHandoffTimeoutMillis | LeaseManagementConfig |  Speciifica il tempo minimo (in millisecondi) di attesa che lo shard corrente si spenga correttamente prima di RecordProcessor trasferire forzatamente il lease al proprietario successivo. Se il metodo ProcessRecords in genere dura più a lungo del valore predefinito, è consigliabile aumentare questa impostazione. In questo modo si garantisce che RecordProcessor disponga di tempo sufficiente per completare l'elaborazione prima che avvenga il trasferimento del leasing. Questa è una nuova configurazione introdotta in KCL 3.x.  | 30.000 (30 secondi) | 
| maxRecords | PollingConfig |  Consente di impostare il numero massimo di record restituiti da Kinesis.  | 10.000 | 
| retryGetRecordsInSeconds | PollingConfig |  Configura il ritardo tra i GetRecords tentativi di errore.  | Nessuno | 
| maxGetRecordsThreadPool | PollingConfig |  La dimensione del pool di thread utilizzato per. GetRecords  | Nessuno | 
| idleTimeBetweenReadsInMillis | PollingConfig |  Determina il tempo di attesa di KCL tra una GetRecords chiamata e l'altra per estrarre i dati dai flussi di dati. L'unità è in millisecondi.  | 1.500 | 
| callProcessRecordsEvenForEmptyRecordList | ProcessorConfig |  Quando impostato, l'elaboratore di record viene chiamato anche quando nessun record è stato fornito da .  | FALSE | 
| parentShardPollIntervalMillis | CoordinatorConfig |  Con quale frequenza un elaboratore di record deve eseguire il polling per vedere se il shard padre è stata completato. L'unità è in millisecondi.  | 10.000 (10 secondi) | 
| skipShardSyncAtWorkerInitializationIfLeaseExist | CoordinatorConfig |  Disabilita la sincronizzazione dei dati shard se la tabella di lease contiene lease esistenti.  |  FALSE  | 
| shardPrioritization | CoordinatorConfig |  Quale prioritizzazione shard utilizzare.  |  NoOpShardPrioritization  | 
| ClientVersionConfig | CoordinatorConfig |  Determina in quale modalità di compatibilità della versione KCL verrà eseguita l'applicazione. Questa configurazione è solo per la migrazione dalle versioni precedenti di KCL. Quando si esegue la migrazione a 3.x, è necessario impostare questa configurazione su. `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` È possibile rimuovere questa configurazione una volta completata la migrazione.  | CLIENT\$1VERSION\$1CONFIG\$13X | 
| taskBackoffTimeMillis | LifecycleConfig |  Il tempo di attesa per riprovare le attività KCL non riuscite. L'unità è in millisecondi.  | 500 (0,5 secondi) | 
| logWarningForTaskAfterMillis | LifecycleConfig |  Quanto tempo bisogna attendere prima che venga registrato un avviso se un'attività non è stata completata.  | Nessuno | 
| listShardsBackoffTimeInMillis | RetrievalConfig | Il numero di millisecondi di attesa tra le chiamate in ListShards quando si verificano errori. L'unità è in millisecondi. | 1.500 (1,5 secondi) | 
| maxListShardsRetryAttempts | RetrievalConfig | Il numero massimo di volte che ListShards effettua nuovi tentativi prima di desistere. | 50 | 
| metricsBufferTimeMillis | MetricsConfig |  Specifica la durata massima (in millisecondi) per memorizzare le metriche nel buffer prima di pubblicarle su. CloudWatch  | 10.000 (10 secondi) | 
| metricsMaxQueueSize | MetricsConfig |  Specifica il numero massimo di metriche da inserire nel buffer prima della pubblicazione. CloudWatch  | 10.000 | 
| metricsLevel | MetricsConfig |  Speciifica il livello di granularità delle CloudWatch metriche da abilitare e pubblicare.  Valori possibili: NONE, SUMMARY, DETAILED.  |  MetricsLevel.DETTAGLIATO  | 
| metricsEnabledDimensions | MetricsConfig |  Controlla le dimensioni consentite per le CloudWatch metriche.  | Tutte le dimensioni | 

**Configurazioni fuori produzione in KCL 3.x**

Le seguenti proprietà di configurazione non sono più disponibili in KCL 3.x:


**La tabella mostra le proprietà di configurazione interrotte per KCL 3.x**  

| Proprietà di configurazione | Classe di configurazione | Description | 
| --- | --- | --- | 
| maxLeasesToStealAtOneTime | LeaseManagementConfig |  Il numero massimo di lease che un'applicazione deve tentare di intercettare simultaneamente. KCL 3.x ignorerà questa configurazione e riassegnerà i leasing in base all'utilizzo delle risorse dei lavoratori.  | 
| enablePriorityLeaseAssignment | LeaseManagementConfig |  Controlla se i lavoratori devono dare la priorità ai contratti di locazione scaduti (contratti di locazione non rinnovati per il triplo del tempo di failover) e ai nuovi contratti di locazione condivisi, indipendentemente dal numero di leasing previsto, ma rispettando comunque i limiti massimi di leasing. KCL 3.x ignorerà questa configurazione e distribuirà sempre i leasing scaduti tra i lavoratori.  | 

**Importante**  
È comunque necessario disporre delle proprietà di configurazione interrotte durante la migrazione dalle versioni precedenti di KCL a KCL 3.x. Durante la migrazione, il lavoratore KCL inizierà innanzitutto con la modalità compatibile con KCL 2.x e passerà alla modalità di funzionalità KCL 3.x quando rileva che tutti i lavoratori KCL dell'applicazione sono pronti per eseguire KCL 3.x. Queste configurazioni interrotte sono necessarie mentre i lavoratori KCL eseguono la modalità compatibile con KCL 2.x.

# Politica del ciclo di vita della versione KCL
<a name="kcl-version-lifecycle-policy"></a>

Questo argomento descrive la politica del ciclo di vita delle versioni per Amazon Kinesis Client Library (KCL). AWS fornisce regolarmente nuove versioni per le versioni KCL per supportare nuove funzionalità e miglioramenti, correzioni di bug, patch di sicurezza e aggiornamenti delle dipendenze. Ti consigliamo di rimanere up-to-date con le versioni KCL per tenerti aggiornato sulle funzionalità più recenti, sugli aggiornamenti di sicurezza e sulle dipendenze sottostanti. **Non** consigliamo l'uso continuato di una versione KCL non supportata.

Il ciclo di vita delle principali versioni di KCL è costituito dalle tre fasi seguenti:
+ **Disponibilità generale (GA)**: durante questa fase, la versione principale è completamente supportata. AWS fornisce regolarmente versioni secondarie e patch che includono il supporto per nuove funzionalità o aggiornamenti delle API per Kinesis Data Streams, oltre a correzioni di bug e sicurezza.
+ **Modalità di manutenzione**: AWS limita il rilascio delle versioni delle patch per risolvere solo le correzioni di bug critici e i problemi di sicurezza. La versione principale non riceverà aggiornamenti per le nuove funzionalità o APIs per Kinesis Data Streams.
+ **E nd-of-support** — La versione principale non riceverà più aggiornamenti o versioni. Le versioni pubblicate in precedenza continueranno a essere disponibili tramite gestori di pacchetti pubblici e il codice rimarrà attivo GitHub. L'uso di una versione raggiunta end-of-support viene effettuato a discrezione dell'utente. Ti consigliamo di eseguire l'aggiornamento alla versione principale più recente.


| Versione principale | Fase attuale | Data di rilascio | Data della modalità di manutenzione | End-of-support data | 
| --- | --- | --- | --- | --- | 
| KCL 1.x | modalità di manutenzione | 19-12-2013 | 2025-04-17 | 2026-01-30 | 
| KCL 2.x | Disponibilità generale | 2018-08-02 | -- | -- | 
| KCL 3.x | Disponibilità generale | 2024-11-06 | -- | -- | 

# Esegui la migrazione dalle versioni precedenti di KCL
<a name="kcl-migration-previous-versions"></a>

Questo argomento spiega come migrare dalle versioni precedenti della Kinesis Client Library (KCL). 

## Cosa c'è di nuovo in KCL 3.0?
<a name="kcl-migration-new-3-0"></a>

Kinesis Client Library (KCL) 3.0 introduce diversi importanti miglioramenti rispetto alle versioni precedenti:
+  Riduce i costi di elaborazione per le applicazioni consumer ridistribuendo automaticamente il lavoro dai lavoratori sovrautilizzati ai lavoratori sottoutilizzati del parco applicazioni consumer. Questo nuovo algoritmo di bilanciamento del carico garantisce l'utilizzo della CPU distribuito in modo uniforme tra i lavoratori ed elimina la necessità di sovraccaricare i lavoratori.
+  Riduce i costi di DynamoDB associati a KCL ottimizzando le operazioni di lettura sulla tabella di leasing.
+ Riduce al minimo la rielaborazione dei dati quando i contratti di locazione vengono riassegnati a un altro lavoratore, permettendo al lavoratore corrente di completare il checkpoint dei record che ha elaborato.
+  Viene utilizzato AWS SDK for Java 2.x per migliorare le prestazioni e le funzionalità di sicurezza, eliminando completamente la dipendenza dalla versione 1.x. AWS SDK per Java 

Per ulteriori informazioni, consulta la nota di rilascio di [KCL 3.0](https://github.com/awslabs/amazon-kinesis-client/blob/master/CHANGELOG.md).

**Topics**
+ [Cosa c'è di nuovo in KCL 3.0?](#kcl-migration-new-3-0)
+ [Migrazione da KCL 2.x a KCL 3.x](kcl-migration-from-2-3.md)
+ [Rollback a una versione di KCL precedente](kcl-migration-rollback.md)
+ [Rollforward a KCL 3.x dopo un rollback](kcl-migration-rollforward.md)
+ [Procedure consigliate per la tabella di leasing con modalità di capacità assegnata](kcl-migration-lease-table.md)
+ [Migrazione di KLC da 1.x a 3.x](kcl-migration-1-3.md)

# Migrazione da KCL 2.x a KCL 3.x
<a name="kcl-migration-from-2-3"></a>

Questo argomento fornisce step-by-step istruzioni per migrare il consumatore da KCL 2.x a KCL 3.x. KCL 3.x supporta la migrazione in loco dei consumatori KCL 2.x. Puoi continuare a utilizzare i dati del tuo flusso di dati Kinesis mentre migri i tuoi lavoratori in modo continuativo.

**Importante**  
KCL 3.x mantiene le stesse interfacce e gli stessi metodi di KCL 2.x. Pertanto non è necessario aggiornare il codice di elaborazione dei record durante la migrazione. Tuttavia, è necessario impostare la configurazione corretta e verificare i passaggi richiesti per la migrazione. Ti consigliamo vivamente di seguire i seguenti passaggi di migrazione per un'esperienza di migrazione senza intoppi.

## Fase 1: prerequisiti
<a name="kcl-migration-from-2-3-prerequisites"></a>

Prima di iniziare a utilizzare KCL 3.x, assicurati di disporre di quanto segue:
+ Java Development Kit (JDK) 8 o versione successiva
+ AWS SDK per Java 2.x
+ Maven o Gradle per la gestione delle dipendenze

**Importante**  
Non utilizzare la AWS SDK per Java versione da 2.27.19 a 2.27.23 con KCL 3.x. Queste versioni includono un problema che causa un errore di eccezione relativo all'utilizzo di DynamoDB da parte di KCL. Si consiglia di utilizzare la AWS SDK per Java versione 2.28.0 o successiva per evitare questo problema. 

## Passaggio 2: aggiungere dipendenze
<a name="kcl-migration-from-2-3-dependencies"></a>

Se utilizzi Maven, aggiungi la seguente dipendenza al tuo file. `pom.xml` Assicurati di aver sostituito 3.x.x con l'ultima versione di KCL. 

```
<dependency>
    <groupId>software.amazon.kinesis</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>3.x.x</version> <!-- Use the latest version -->
</dependency>
```

Se stai usando Gradle, aggiungi quanto segue al tuo file. `build.gradle` Assicurati di aver sostituito 3.x.x con l'ultima versione di KCL. 

```
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
```

[Puoi verificare la versione più recente di KCL sul Maven Central Repository.](https://search.maven.org/artifact/software.amazon.kinesis/amazon-kinesis-client)

## Passaggio 3: configurare la configurazione relativa alla migrazione
<a name="kcl-migration-from-2-3-configuration"></a>

Per migrare da KCL 2.x a KCL 3.x, è necessario impostare il seguente parametro di configurazione:
+ CoordinatorConfig. clientVersionConfig: Questa configurazione determina in quale modalità di compatibilità della versione KCL verrà eseguita l'applicazione. Quando si esegue la migrazione da KCL 2.x a 3.x, è necessario impostare questa configurazione su. `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` Per impostare questa configurazione, aggiungi la seguente riga durante la creazione dell'oggetto scheduler:

```
configsBuilder.coordiantorConfig().clientVersionConfig(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPLATIBLE_WITH_2X)
```

Di seguito è riportato un esempio di come impostare la `CoordinatorConfig.clientVersionConfig` migrazione da KCL 2.x a 3.x. Puoi regolare altre configurazioni secondo necessità in base alle tue esigenze specifiche:

```
Scheduler scheduler = new Scheduler(
    configsBuilder.checkpointConfig(),
    configsBuilder.coordiantorConfig().clientVersionConfig(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPLATIBLE_WITH_2X),
    configsBuilder.leaseManagementConfig(),
    configsBuilder.lifecycleConfig(),
    configsBuilder.metricsConfig(),
    configsBuilder.processorConfig(),
    configsBuilder.retrievalConfig()
);
```

È importante che tutti gli operatori dell'applicazione consumer utilizzino lo stesso algoritmo di bilanciamento del carico in un determinato momento, poiché KCL 2.x e 3.x utilizzano algoritmi di bilanciamento del carico diversi. L'utilizzo di lavoratori con algoritmi di bilanciamento del carico diversi può causare una distribuzione del carico non ottimale poiché i due algoritmi funzionano in modo indipendente.

Questa impostazione di compatibilità KCL 2.x consente all'applicazione KCL 3.x di funzionare in una modalità compatibile con KCL 2.x e di utilizzare l'algoritmo di bilanciamento del carico per KCL 2.x fino a quando tutti i worker dell'applicazione consumer non saranno stati aggiornati a KCL 3.x. Una volta completata la migrazione, KCL passerà automaticamente alla modalità di funzionalità KCL 3.x completa e inizierà a utilizzare un nuovo algoritmo di bilanciamento del carico KCL 3.x per tutti i lavoratori in esecuzione.

**Importante**  
Se non stai utilizzando `ConfigsBuilder` ma stai creando un `LeaseManagementConfig` oggetto per impostare le configurazioni, devi aggiungere un altro parametro chiamato `applicationName` nella versione KCL 3.x o successiva. Per i dettagli, vedi [Errore di compilazione](https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#compiliation-error-leasemanagementconfig) con il costruttore. LeaseManagementConfig Si consiglia di `ConfigsBuilder` utilizzarlo per impostare le configurazioni KCL. `ConfigsBuilder`offre un modo più flessibile e gestibile per configurare l'applicazione KCL.

## Fase 4: Segui le migliori pratiche per l'implementazione del metodo shutdownRequested ()
<a name="kcl-migration-from-2-3-best-practice"></a>

KCL 3.x introduce una funzionalità chiamata *graceful lease handoff* per ridurre al minimo la rielaborazione dei dati quando un contratto di locazione viene ceduto a un altro lavoratore come parte del processo di riassegnazione del contratto di locazione. Ciò si ottiene selezionando l'ultimo numero di sequenza elaborato nella tabella di locazione prima della cessione del contratto di locazione. Per assicurarvi che il grazioso comando di lease funzioni correttamente, dovete assicurarvi di richiamare l'oggetto all'interno del metodo della classe. `checkpointer` `shutdownRequested` `RecordProcessor` Se non state invocando l'`checkpointer`oggetto all'interno del `shutdownRequested` metodo, potete implementarlo come illustrato nell'esempio seguente. 

**Importante**  
Il seguente esempio di implementazione è un requisito minimo per la consegna del leasing grazioso. È possibile estenderlo per includere una logica aggiuntiva relativa al checkpoint, se necessario. Se stai eseguendo un'elaborazione asincrona, assicurati che tutti i record consegnati al sistema a valle siano stati elaborati prima di richiamare il checkpoint. 
Sebbene la consegna gradita del leasing riduca in modo significativo la probabilità di ritrattamento dei dati durante i trasferimenti di leasing, non elimina completamente questa possibilità. Per preservare l'integrità e la coerenza dei dati, progetta le tue applicazioni consumer downstream in modo che siano idempotenti. Ciò significa che dovrebbero essere in grado di gestire potenziali elaborazioni di record duplicati senza effetti negativi sull'intero sistema.

```
/**
 * Invoked when either Scheduler has been requested to gracefully shutdown
 * or lease ownership is being transferred gracefully so the current owner
 * gets one last chance to checkpoint.
 *
 * Checkpoints and logs the data a final time.
 *
 * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint
 *                               before the shutdown is completed.
 */
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
    try {
       // Ensure that all delivered records are processed 
       // and has been successfully flushed to the downstream before calling 
       // checkpoint
       // If you are performing any asynchronous processing or flushing to
       // downstream, you must wait for its completion before invoking
       // the below checkpoint method.
        log.info("Scheduler is shutting down, checkpointing.");
        shutdownRequestedInput.checkpointer().checkpoint();
    } catch (ShutdownException | InvalidStateException e) {
        log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
    } 
}
```

## Fase 5: Verifica i prerequisiti di KCL 3.x per la raccolta delle metriche dei lavoratori
<a name="kcl-migration-from-2-3-worker-metrics"></a>

KCL 3.x raccoglie i parametri di utilizzo della CPU, come l'utilizzo della CPU, dai lavoratori per bilanciare il carico tra i lavoratori in modo uniforme. Gli applicativi consumer possono essere eseguiti su Amazon EC2, Amazon ECS, Amazon EKS o. AWS Fargate KCL 3.x può raccogliere i parametri di utilizzo della CPU dai lavoratori solo quando vengono soddisfatti i seguenti prerequisiti:

 **Amazon Elastic Compute Cloud(Amazon EC2)**
+ Il sistema operativo deve essere Linux OS.
+ È necessario abilitarlo [IMDSv2](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html)nella propria istanza EC2.

 **Amazon Elastic Container Service (Amazon ECS) su Amazon EC2**
+ Il sistema operativo deve essere Linux OS.
+ È necessario abilitare l'endpoint [ECS Task Metadata Endpoint versione 4](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ec2-metadata.html). 
+ La versione dell'agente container Amazon ECS deve essere 1.39.0 o successiva.

 **Amazon ECS su AWS Fargate**
+ È necessario abilitare l'[endpoint dei metadati delle attività Fargate](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4-fargate.html) versione 4. Se si utilizza la versione 1.4.0 o successiva della piattaforma Fargate, questa opzione è abilitata per impostazione predefinita. 
+ Piattaforma Fargate versione 1.4.0 o successiva.

 **Amazon Elastic Kubernetes Service (Amazon EKS) su Amazon EC2** 
+ Il sistema operativo deve essere Linux OS.

 **Amazon EKS su AWS Fargate**
+ Piattaforma Fargate 1.3.0 o versione successiva.

**Importante**  
Se KCL 3.x non è in grado di raccogliere i parametri di utilizzo della CPU dai lavoratori perché i prerequisiti non sono soddisfatti, riequilibrerà il carico in base al livello di throughput per leasing. Questo meccanismo di ribilanciamento alternativo assicurerà che tutti i lavoratori ottengano livelli di produttività totale simili dai contratti di locazione assegnati a ciascun lavoratore. Per ulteriori informazioni, consulta [In che modo KCL assegna i contratti di locazione ai lavoratori e bilancia il carico](kcl-dynamoDB.md#kcl-assign-leases).

## Fase 6: Aggiornamento delle autorizzazioni IAM per KCL 3.x
<a name="kcl-migration-from-2-3-IAM-permissions"></a>

Devi aggiungere le seguenti autorizzazioni al ruolo o alla policy IAM associata alla tua applicazione consumer KCL 3.x. Ciò comporta l'aggiornamento della politica IAM esistente utilizzata dall'applicazione KCL. Per ulteriori informazioni, consulta [Autorizzazioni IAM richieste per le applicazioni consumer KCL](kcl-iam-permissions.md).

**Importante**  
Le tue applicazioni KCL esistenti potrebbero non avere le seguenti azioni e risorse IAM aggiunte nella policy IAM perché non erano necessarie in KCL 2.x. Assicurati di averle aggiunte prima di eseguire l'applicazione KCL 3.x:  
Azioni: `UpdateTable`  
Risorse (ARNs): `arn:aws:dynamodb:region:account:table/KCLApplicationName`
Azioni: `Query`  
Risorse (ARNs): `arn:aws:dynamodb:region:account:table/KCLApplicationName/index/*`
Azioni: `CreateTable``DescribeTable`,,`Scan`,`GetItem`,`PutItem`,`UpdateItem`, `DeleteItem`  
Risorse (ARNs):`arn:aws:dynamodb:region:account:table/KCLApplicationName-WorkerMetricStats`, `arn:aws:dynamodb:region:account:table/KCLApplicationName-CoordinatorState`
Sostituisci «regione», «account» e "KCLApplicationNome» rispettivamente ARNs con il tuo Regione AWS Account AWS numero e il nome dell'applicazione KCL. Se utilizzi configurazioni per personalizzare i nomi delle tabelle di metadati create da KCL, usa i nomi di tabella specificati anziché il nome dell'applicazione KCL.

## Passaggio 7: distribuisci il codice KCL 3.x ai tuoi lavoratori
<a name="kcl-migration-from-2-3-IAM-deploy"></a>

Dopo aver impostato la configurazione richiesta per la migrazione e completato tutte le liste di controllo sulla migrazione precedenti, puoi creare e distribuire il codice ai tuoi lavoratori.

**Nota**  
Se vedi un errore di compilazione con il `LeaseManagementConfig` costruttore, vedi Errore di [compilazione con il costruttore per informazioni sulla risoluzione dei LeaseManagementConfig problemi](https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#compilation-error-leasemanagementconfig).

## Fase 8: Completare la migrazione
<a name="kcl-migration-from-2-3-finish"></a>

Durante la distribuzione del codice KCL 3.x, KCL continua a utilizzare l'algoritmo di assegnazione del leasing di KCL 2.x. Dopo aver distribuito con successo il codice KCL 3.x a tutti i lavoratori, KCL lo rileva automaticamente e passa al nuovo algoritmo di assegnazione del leasing basato sull'utilizzo delle risorse dei lavoratori. Per ulteriori dettagli sul nuovo algoritmo di assegnazione del leasing, vedere. [In che modo KCL assegna i contratti di locazione ai lavoratori e bilancia il carico](kcl-dynamoDB.md#kcl-assign-leases)

Durante la distribuzione, è possibile monitorare il processo di migrazione emettendo le seguenti metriche a. CloudWatch È possibile monitorare le metriche durante l'operazione. `Migration` Tutte le metriche sono per-KCL-application metriche e impostate sul livello metrico. `SUMMARY` Se la `Sum` statistica della `CurrentState:3xWorker` metrica corrisponde al numero totale di lavoratori nell'applicazione KCL, indica che la migrazione a KCL 3.x è stata completata con successo.

**Importante**  
 KCL impiega almeno 10 minuti per passare al nuovo algoritmo di assegnazione del leasee dopo che tutti i lavoratori sono pronti a eseguirlo.


**CloudWatch metriche per il processo di migrazione KCL**  

| Metriche | Description | 
| --- | --- | 
| CurrentState:3xWorker |  Il numero di lavoratori KCL è migrato con successo a KCL 3.x e ha eseguito il nuovo algoritmo di assegnazione del leasing. Se il `Sum` conteggio di questa metrica corrisponde al numero totale dei lavoratori, indica che la migrazione a KCL 3.x è stata completata con successo. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| CurrentState:2xCompatibleWorker |  Il numero di lavoratori KCL in esecuzione in modalità compatibile con KCL 2.x durante il processo di migrazione. Un valore diverso da zero per questa metrica indica che la migrazione è ancora in corso. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| Fault |  Il numero di eccezioni riscontrate durante il processo di migrazione. La maggior parte di queste eccezioni sono errori transitori e KCL 3.x riproverà automaticamente a completare la migrazione. Se osservi un valore `Fault` metrico persistente, esamina i log del periodo di migrazione per un'ulteriore risoluzione dei problemi. Se il problema persiste, contatta. Supporto [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| GsiStatusReady |  Lo stato della creazione dell'indice secondario globale (GSI) nella tabella di leasing. Questa metrica indica se il GSI nella tabella di leasing è stato creato, un prerequisito per eseguire KCL 3.x. Il valore è 0 o 1, dove 1 indica una creazione riuscita. Durante uno stato di rollback, questa metrica non verrà emessa. Dopo aver effettuato nuovamente il rollforward, puoi riprendere a monitorare questa metrica. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| workerMetricsReady |  Stato delle emissioni delle metriche dei lavoratori da parte di tutti i lavoratori. Le metriche indicano se tutti i lavoratori emettono parametri come l'utilizzo della CPU. Il valore è 0 o 1, dove 1 indica che tutti i lavoratori stanno emettendo correttamente le metriche e sono pronti per il nuovo algoritmo di assegnazione del leasing. Durante uno stato di rollback, questa metrica non verrà emessa. Dopo aver effettuato nuovamente il rollforward, puoi riprendere a monitorare questa metrica. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/streams/latest/dev/kcl-migration-from-2-3.html)  | 

KCL fornisce funzionalità di rollback alla modalità compatibile con 2.x durante la migrazione. Una volta completata la migrazione a KCL 3.x, ti consigliamo di rimuovere l'`CoordinatorConfig.clientVersionConfig`impostazione `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` se il rollback non è più necessario. La rimozione di questa configurazione interrompe l'emissione di metriche relative alla migrazione dall'applicazione KCL.

**Nota**  
Ti consigliamo di monitorare le prestazioni e la stabilità dell'applicazione per un periodo durante la migrazione e dopo il completamento della migrazione. Se si riscontrano problemi, è possibile ripristinare i lavoratori affinché utilizzino la funzionalità compatibile con KCL 2.x utilizzando lo strumento di migrazione [KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py).

# Rollback a una versione di KCL precedente
<a name="kcl-migration-rollback"></a>

Questo argomento spiega i passaggi per ripristinare la versione precedente del consumatore. Quando è necessario eseguire il rollback, è prevista una procedura in due fasi: 

1. Esegui lo [Strumento di migrazione di KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py).

1. Ridistribuisci il codice della versione precedente di KCL (opzionale).

## Fase 1: eseguire lo Strumento di migrazione di KCL
<a name="kcl-migration-rollback-tool"></a>

Quando è necessario tornare alla versione precedente di KCL, occorre eseguire lo Strumento di migrazione KCL. Il KCL Migration Tool svolge due compiti importanti:
+ Rimuove una tabella di metadati denominata tabella delle metriche dei lavoratori e l’indice secondario globale nella tabella di lease in DynamoDB. Questi due artefatti sono creati da KCL 3.x ma non sono necessari quando si torna alla versione precedente.
+ Consente a tutti i lavoratori di funzionare in una modalità compatibile con KCL 2.x e di iniziare a utilizzare l'algoritmo di bilanciamento del carico utilizzato nelle versioni precedenti di KCL. In caso di problemi con il nuovo algoritmo di bilanciamento del carico in KCL 3.x, questo ridurrà immediatamente il problema.

**Importante**  
La tabella dello stato del coordinatore in DynamoDB deve esistere e non deve essere eliminata durante il processo di migrazione, rollback e rollforward. 

**Nota**  
È importante che tutti i lavoratori dell’applicazione consumer utilizzino lo stesso algoritmo di bilanciamento del carico in un determinato momento. Lo strumento di migrazione KCL fa in modo che tutti i lavoratori dell'applicazione consumer KCL 3.x passino alla modalità compatibile con KCL 2.x in modo che tutti i lavoratori eseguano lo stesso algoritmo di bilanciamento del carico durante il roll-depayment alla versione precedente di KCL.

[Puoi scaricare lo [strumento di migrazione KCL nella directory degli script del repository KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py). GitHub](https://github.com/awslabs/amazon-kinesis-client/tree/master) Lo script può essere eseguito da qualsiasi lavoratore o da qualsiasi host che disponga delle autorizzazioni necessarie per scrivere nella tabella degli stati del coordinatore, eliminare la tabella delle metriche dei lavoratori e aggiornare la tabella dei leasing. Puoi fare riferimento all'autorizzazione IAM richiesta [Autorizzazioni IAM richieste per le applicazioni consumer KCL](kcl-iam-permissions.md) per eseguire lo script. È necessario eseguire lo script solo una volta per applicazione KCL. È possibile eseguire lo strumento di migrazione KCL con il seguente comando: 

```
python3 ./KclMigrationTool.py --region <region> --mode rollback [--application_name <applicationName>] [--lease_table_name <leaseTableName>] [--coordinator_state_table_name <coordinatorStateTableName>] [--worker_metrics_table_name <workerMetricsTableName>]
```

**Parametri**
+ --region: sostituisci `<region>` con il tuo. Regione AWS
+ --application\$1name: Questo parametro è obbligatorio se utilizzi nomi predefiniti per le tabelle dei metadati di DynamoDB (lease table, coordinator state table e worker metrics table). Se sono stati specificati nomi personalizzati per queste tabelle, è possibile omettere questo parametro. `<applicationName>`Sostituiscilo con il nome effettivo dell'applicazione KCL. Lo strumento utilizza questo nome per ricavare i nomi delle tabelle predefiniti se non vengono forniti nomi personalizzati.
+ --lease\$1table\$1name (opzionale): questo parametro è necessario quando hai impostato un nome personalizzato per la tabella di leasing nella tua configurazione KCL. Se si utilizza il nome della tabella predefinito, è possibile omettere questo parametro. Sostituiscilo `leaseTableName` con il nome della tabella personalizzata che hai specificato per la tua tabella di leasing.
+ --coordinator\$1state\$1table\$1name (opzionale): questo parametro è necessario quando hai impostato un nome personalizzato per la tabella degli stati del coordinatore nella tua configurazione KCL. Se si utilizza il nome della tabella predefinito, è possibile omettere questo parametro. Sostituisci `<coordinatorStateTableName>` con il nome della tabella personalizzata che hai specificato per la tabella degli stati del coordinatore. 
+ --worker\$1metrics\$1table\$1name (opzionale): questo parametro è necessario quando hai impostato un nome personalizzato per la tabella delle metriche dei lavoratori nella tua configurazione KCL. Se si utilizza il nome della tabella predefinito, è possibile omettere questo parametro. Sostituiscilo con il nome `<workerMetricsTableName>` della tabella personalizzata che hai specificato per la tabella delle metriche dei lavoratori. 

## Passaggio 2: ridistribuisci il codice con la versione precedente di KCL (opzionale)
<a name="kcl-migration-rollback-redeploy"></a>

 Dopo aver eseguito lo strumento di migrazione di KCL per un rollback, verrà mostrato uno di questi messaggi:
+ **Messaggio 1:** «Rollback completato. L'applicazione KCL eseguiva la modalità compatibile con KCL 2.x. Se non vedi alcuna mitigazione di alcuna regressione, torna ai file binari dell'applicazione precedente distribuendo il codice con la versione KCL precedente.»
  + **Azione richiesta:** Ciò significa che i tuoi lavoratori funzionavano nella modalità compatibile con KCL 2.x. Se il problema persiste, ridistribuisci il codice con la versione precedente di KCL ai tuoi lavoratori.
+ **Messaggio 2:** «Rollback completato. L'applicazione KCL eseguiva la modalità di funzionalità KCL 3.x. Il rollback ai file binari dell'applicazione precedente non è necessario, a meno che non si veda alcuna mitigazione del problema entro 5 minuti. Se il problema persiste, ripristina i file binari dell'applicazione precedente distribuendo il codice con la versione precedente di KCL.»
  + **Azione richiesta:** Ciò significa che i tuoi lavoratori lavoravano in modalità KCL 3.x e lo strumento di migrazione KCL ha portato tutti i lavoratori alla modalità compatibile con KCL 2.x. Se il problema viene risolto, non è necessario ridistribuire il codice con la versione precedente di KCL. Se il problema persiste, ridistribuisci il codice con la versione precedente di KCL ai tuoi worker.

 

# Rollforward a KCL 3.x dopo un rollback
<a name="kcl-migration-rollforward"></a>

Questo argomento spiega i passaggi per riportare il consumatore a KCL 3.x dopo un rollback. Quando è necessario eseguire il rollforward, è necessario eseguire una procedura in due fasi: 

1. Esegui lo [Strumento di migrazione di KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py). 

1. Implementa il codice con KCL 3.x.

## Fase 1: eseguire lo Strumento di migrazione di KCL
<a name="kcl-migration-rollback-tool"></a>

Esegui lo Strumento di migrazione di KCL. Strumento di migrazione KCL con il seguente comando per passare a KCL 3.x:

```
python3 ./KclMigrationTool.py --region <region> --mode rollforward [--application_name <applicationName>] [--coordinator_state_table_name <coordinatorStateTableName>]
```

**Parametri**
+ --region: sostituisci con il tuo. `<region>` Regione AWS
+ --application\$1name: questo parametro è obbligatorio se utilizzi nomi predefiniti per la tabella degli stati del coordinatore. Se sono stati specificati nomi personalizzati per la tabella dello stato del coordinatore, è possibile omettere questo parametro. Sostituiscilo `<applicationName>` con il nome effettivo dell'applicazione KCL. Lo strumento utilizza questo nome per ricavare i nomi delle tabelle predefiniti se non vengono forniti nomi personalizzati.
+ --coordinator\$1state\$1table\$1name (opzionale): questo parametro è necessario quando hai impostato un nome personalizzato per la tabella degli stati del coordinatore nella tua configurazione KCL. Se si utilizza il nome della tabella predefinito, è possibile omettere questo parametro. Sostituisci `<coordinatorStateTableName>` con il nome della tabella personalizzata che hai specificato per la tabella degli stati del coordinatore. 

Dopo aver eseguito lo strumento di migrazione in modalità rollforward, KCL crea le seguenti risorse DynamoDB necessarie per KCL 3.x:
+ Un indice secondario globale nella tabella di lease
+ Una tabella delle metriche dei lavoratori

## Fase 2: implementare il codice con KCL 3.x
<a name="kcl-migration-rollback-redeploy"></a>

Dopo aver eseguito lo Strumento di migrazione di KCL per un rollforward, implementa il codice con KCL 3.x sui lavoratori. Segui [Fase 8: Completare la migrazione](kcl-migration-from-2-3.md#kcl-migration-from-2-3-finish) per completare la migrazione.

# Procedure consigliate per la tabella di leasing con modalità di capacità assegnata
<a name="kcl-migration-lease-table"></a>

Se la tabella di leasing dell'applicazione KCL è passata alla modalità di capacità fornita, KCL 3.x crea un indice secondario globale nella tabella di leasing con la modalità di fatturazione assegnata e le stesse unità di capacità di lettura (RCU) e unità di capacità di scrittura (WCU) della tabella di leasing di base. Quando viene creato l'indice secondario globale, consigliamo di monitorare l'utilizzo effettivo dell'indice secondario globale nella console DynamoDB e di regolare le unità di capacità, se necessario. Per una guida più dettagliata sulla modifica della modalità di capacità delle tabelle di metadati DynamoDB create da KCL, vedere. [Modalità di capacità DynamoDB per le tabelle di metadati create da KCL](kcl-dynamoDB.md#kcl-capacity-mode) 

**Nota**  
Per impostazione predefinita, KCL crea tabelle di metadati come la tabella di leasing, la tabella delle metriche dei lavoratori e la tabella degli stati dei coordinatori e l'indice secondario globale nella tabella di leasing utilizzando la modalità di capacità su richiesta. Ti consigliamo di utilizzare la modalità di capacità su richiesta per regolare automaticamente la capacità in base alle modifiche all'utilizzo. 

# Migrazione di KLC da 1.x a 3.x
<a name="kcl-migration-1-3"></a>

Questo argomento spiega le istruzioni per migrare il consumatore da KCL 1.x a KCL 3.x. KCL 1.x utilizza classi e interfacce diverse rispetto a KCL 2.x e KCL 3.x. È necessario prima migrare il record processor, il record processor factory e le classi worker al formato compatibile con KCL 2.x/3.x e seguire i passaggi di migrazione per la migrazione da KCL 2.x a KCL 3.x. Puoi eseguire l'aggiornamento direttamente da KCL 1.x a KCL 3.x.
+ **Passaggio 1: migrazione del processore di registrazione**

  Segui la sezione [Migrare il processore di registrazione](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration) nella pagina [Migrare i consumatori da KCL 1.x a KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration).
+ **Fase 2: Migrazione della fabbrica del processore di registrazione**

  Segui la sezione [Migrare la fabbrica del processore di registrazione](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-factory-migration) nella pagina [Migrare i consumatori da KCL 1.x a KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration).
+ **Fase 3: Migrare il lavoratore**

  Segui la sezione [Migrare il lavoratore](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#worker-migration) nella pagina [Migrare i consumatori da KCL 1.x a KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration).
+ **Fase 4: Migrazione della configurazione di KCL 1.x**

  Segui la sezione [Configura il client Amazon Kinesis](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#client-configuration) nella pagina [Migrare i consumatori da KCL 1.x a KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration).
+ **Passaggio 5: verifica la rimozione dei periodi di inattività e le rimozioni della configurazione del client**

  Segui [le sezioni Rimozione dei tempi di inattività e Rimozione](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#idle-time-removal) [della configurazione del client nella pagina Migrare i](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#client-configuration-removals) [consumatori da KCL 1.x a KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration).
+ **Passaggio 6: segui le step-by-step istruzioni nella guida alla migrazione da KCL 2.x a KCL 3.x**

  Segui le istruzioni sulla [Migrazione da KCL 2.x a KCL 3.x](kcl-migration-from-2-3.md) pagina per completare la migrazione. Se devi tornare alla versione precedente di KCL o passare a KCL 3.x dopo un rollback, consulta e. [Rollback a una versione di KCL precedente](kcl-migration-rollback.md) [Rollforward a KCL 3.x dopo un rollback](kcl-migration-rollforward.md)

**Importante**  
Non utilizzare la AWS SDK per Java versione da 2.27.19 a 2.27.23 con KCL 3.x. Queste versioni includono un problema che causa un errore di eccezione relativo all'utilizzo di DynamoDB da parte di KCL. Si consiglia di utilizzare la AWS SDK per Java versione 2.28.0 o successiva per evitare questo problema. 

# Documentazione della versione precedente di KCL
<a name="kcl-archive"></a>

I seguenti argomenti sono stati archiviati. Per visualizzare la documentazione corrente di Kinesis Client Library, vedere. [Usa la libreria client Kinesis](kcl.md)

**Importante**  
Le versioni 1.x e 2.x di Amazon KCL sono obsolete. KCL 1.x sarà disponibile il 30 end-of-support gennaio 2026. **Ti consigliamo vivamente di** migrare le tue applicazioni KCL utilizzando la versione 1.x all'ultima versione di KCL prima del 30 gennaio 2026. Per trovare la versione più recente di KCL, consulta la pagina [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) su. GitHub Per informazioni sulle ultime versioni di KCL, consulta. [Usa la libreria client Kinesis](kcl.md) Per informazioni sulla migrazione da KCL 1.x a KCL 3.x, consulta. [Migrazione di KLC da 1.x a 3.x](kcl-migration-1-3.md)

**Topics**
+ [Informazioni su KCL 1.x e 2.x](shared-throughput-kcl-consumers.md)
+ [Sviluppa consumatori personalizzati con un throughput condiviso](shared-throughput-consumers.md)
+ [Migra i consumatori da KCL 1.x a KCL 2.x](kcl-migration.md)

# Informazioni su KCL 1.x e 2.x
<a name="shared-throughput-kcl-consumers"></a>

**Importante**  
Le versioni 1.x e 2.x di Amazon KCL sono obsolete. KCL 1.x arriverà il 30 end-of-support gennaio 2026. **Ti consigliamo vivamente di** migrare le tue applicazioni KCL utilizzando la versione 1.x all'ultima versione di KCL prima del 30 gennaio 2026. Per trovare la versione più recente di KCL, consulta la pagina [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) su. GitHub Per informazioni sulle ultime versioni di KCL, consulta. [Usa la libreria client Kinesis](kcl.md) Per informazioni sulla migrazione da KCL 1.x a KCL 3.x, consulta. [Migrazione di KLC da 1.x a 3.x](kcl-migration-1-3.md)

Uno dei metodi per sviluppare applicazioni consumer personalizzate in grado di elaborare i dati dai flussi di dati KDS consiste nell'utilizzare la Kinesis Client Library (KCL).

**Topics**
+ [Informazioni su KCL (versioni precedenti)](#shared-throughput-kcl-consumers-overview)
+ [Versioni precedenti di KCL](#shared-throughput-kcl-consumers-versions)
+ [Concetti KCL (versioni precedenti)](#shared-throughput-kcl-consumers-concepts)
+ [Utilizza una tabella di leasing per tenere traccia degli shard elaborati dall'applicazione consumer KCL](#shared-throughput-kcl-consumers-leasetable)
+ [Elabora più flussi di dati con la stessa applicazione consumer KCL 2.x per Java](#shared-throughput-kcl-multistream)
+ [Usa KCL con lo Schema Registry AWS Glue](#shared-throughput-kcl-consumers-glue-schema-registry)

**Nota**  
Sia per KCL 1.x che per KCL 2.x, si consiglia di eseguire l'aggiornamento alla versione più recente di KCL 1.x o KCL 2.x, a seconda dello scenario di utilizzo. Sia KCL 1.x che KCL 2.x vengono regolarmente aggiornate con versioni più recenti che includono le ultime patch di dipendenza e sicurezza, correzioni di bug e nuove funzionalità retrocompatibili. [Per ulteriori informazioni, consulta /releases. https://github.com/awslabs/ amazon-kinesis-client](https://github.com/awslabs/amazon-kinesis-client/releases)

## Informazioni su KCL (versioni precedenti)
<a name="shared-throughput-kcl-consumers-overview"></a>

KCL ti aiuta a consumare ed elaborare i dati da un flusso di dati Kinesis occupandoti di molte delle attività complesse associate al calcolo distribuito. Queste includono il bilanciamento del carico su più istanze di applicazioni consumer, la risposta agli errori delle istanze delle applicazioni consumer, il checkpoint dei record elaborati e la reazione al ripartizionamento. La KCL si occupa di tutte queste attività secondarie in modo che tu possa concentrare i tuoi sforzi sulla scrittura della tua logica di elaborazione dei record personalizzata.

KCL è diverso dai Kinesis Data APIs Streams disponibili in. AWS SDKs Kinesis APIs Data Streams ti aiuta a gestire molti aspetti di Kinesis Data Streams, tra cui la creazione di stream, il resharding e l'inserimento e l'acquisizione di record. La KCL fornisce un livello di astrazione su tutte queste sottoattività, in particolare per consentirti di concentrarti sulla logica di elaborazione dei dati personalizzata dell'applicazione consumer. Per ulteriori informazioni sulle API del flusso di dati Kinesis, consulta la [Documentazione di riferimento delle API di Amazon Kinesis](https://docs.aws.amazon.com/kinesis/latest/APIReference/Welcome.html).

**Importante**  
La KCL è una libreria Java. Il supporto per linguaggi diversi da Java viene fornito utilizzando un'interfaccia multilingue chiamata. MultiLangDaemon Questo daemon è basato su Java e viene eseguito in background quando si utilizza un linguaggio KCL diverso da Java. Ad esempio, se installi KCL per Python e scrivi la tua applicazione consumer interamente in Python, avrai comunque bisogno che Java sia installato sul tuo sistema a causa di. MultiLangDaemon Inoltre, MultiLangDaemon ha alcune impostazioni predefinite che potresti dover personalizzare in base al tuo caso d'uso, ad esempio la AWS regione a cui si connette. Per ulteriori informazioni su MultiLangDaemon on GitHub, vedere il [ MultiLangDaemon progetto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

La KCL funge da intermediario tra la tua logica di elaborazione di record e il flusso di dati Kinesis. 

## Versioni precedenti di KCL
<a name="shared-throughput-kcl-consumers-versions"></a>

Al momento per creare applicazioni consumer personalizzate puoi utilizzare una delle seguenti versioni supportate di KCL:
+ **KCL 1.x**

  Per ulteriori informazioni, consulta [Sviluppa i consumatori di KCL 1.x](developing-consumers-with-kcl.md)
+ **KCL 2.x**

  Per ulteriori informazioni, consulta [Sviluppa KCL 2.x Consumers](developing-consumers-with-kcl-v2.md)

È possibile utilizzare KCL 1.x o KCL 2.x per creare applicazioni consumer che utilizzano una velocità di trasmissione effettiva condivisa. Per ulteriori informazioni, consulta [Sviluppa consumatori personalizzati con un throughput condiviso utilizzando KCL](custom-kcl-consumers.md).

Per creare applicazioni consumer che utilizzano una velocità di trasmissione effettiva dedicata (utenti fan-out avanzati), è possibile utilizzare solo KCL 2.x. Per ulteriori informazioni, consulta [Sviluppa consumatori con fan-out migliorati con un throughput dedicato](enhanced-consumers.md).

Per informazioni sulle differenze tra KCL 1.x e KCL 2.x e le istruzioni su come passare da KCL 1.x a KCL 2.x, consulta [Migra i consumatori da KCL 1.x a KCL 2.x](kcl-migration.md).

## Concetti KCL (versioni precedenti)
<a name="shared-throughput-kcl-consumers-concepts"></a>
+ **Applicazione consumer KCL**: un'applicazione personalizzata che utilizza KCL e progettata per leggere ed elaborare i record dai flussi di dati. 
+ **Istanza di applicazioni consumer**: le applicazioni consumer KCL sono generalmente distribuite, con una o più istanze applicative eseguite contemporaneamente per coordinarsi in caso di guasti e bilanciare dinamicamente l'elaborazione dei record di dati.
+ **Worker**: una classe di livello superiore utilizzata da un'istanza di applicazione consumer KCL per iniziare l'elaborazione dei dati. 
**Importante**  
Ogni istanza dell'applicazione consumer KCL ha un worker. 

  Il worker inizializza e supervisiona varie attività, tra cui la sincronizzazione delle informazioni sulle partizioni e sui lease, il monitoraggio delle assegnazioni delle partizioni e l'elaborazione dei dati dalle partizioni. Un worker fornisce a KCL le informazioni di configurazione per l'applicazione consumer, ad esempio il nome del flusso di dati i cui record di dati verranno elaborati dall'applicazione consumer KCL e le AWS credenziali necessarie per accedere a questo flusso di dati. Il worker avvia inoltre quella specifica istanza dell'applicazione consumer KCL per fornire i record di dati dal flusso di dati ai processori di record.
**Importante**  
In KCL 1.x questa classe si chiama **Worker**. [Per ulteriori informazioni, (questi sono i repository Java KCL), vedere/.java. https://github.com/awslabs/ amazon-kinesis-client blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker](https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java) In KCL 2.x questa classe si chiama **Scheduler**. Lo scopo di Scheduler in KCL 2.x è identico allo scopo di Worker in KCL 1.x. [Per ulteriori informazioni sulla classe Scheduler in KCL 2.x, vedete/.java. https://github.com/awslabs/ amazon-kinesis-client blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java) 
+ **Lease**: dati che definiscono l'associazione tra un worker e una partizione. Le applicazioni consumer KCL distribuite utilizzano i lease per suddividere l'elaborazione dei record di dati tra un parco istanze di worker. In qualsiasi momento, ogni partizione di record di dati è legato a un determinato worker tramite un lease identificato dalla variabile **leaseKey**. 

  Per impostazione predefinita, un lavoratore può detenere uno o più contratti di locazione (in base al valore della variabile **maxLeasesForWorker**) contemporaneamente. 
**Importante**  
Ogni worker si impegna a detenere tutti i lease disponibili per tutte le partizioni disponibili in un flusso di dati. Ma solo un worker alla volta si aggiudicherà con successo ogni lease. 

  Ad esempio, se si dispone di un'istanza di applicazione consumer A con worker A che elabora un flusso di dati con 4 partizioni, il worker A può detenere i lease per le partizioni 1, 2, 3 e 4 contemporaneamente. Tuttavia, se si dispone di due istanze di applicazioni consumer, A e B, con worker A e worker B, e queste istanze elaborano un flusso di dati con 4 partizioni, il worker A e il worker B non possono entrambi detenere il lease per la partizione 1 contemporaneamente. Un worker detiene il lease di una particolare partizione finché non è pronto a interrompere l'elaborazione dei record di dati della partizione o fino a quando non si verifica un guasto. Quando un worker smette di detenere il lease, un altro worker lo riprende e lo mantiene. 

  [Per ulteriori informazioni (questi sono i repository Java KCL), vedere [https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java per KCL 1.x e https://github.com/awslabs/amazon-kinesis-client/.java](https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java) per KCL 2.x. blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java)
+ **Tabella di lease:** una tabella univoca di Amazon DynamoDB utilizzata per tenere traccia delle partizioni in un flusso di dati KDS sottoposte a lease ed elaborate dai worker dell'applicazione consumer KCL. La tabella di lease deve rimanere sincronizzata (all'interno di un worker e tra tutti i worker) con le ultime informazioni sulle partizioni provenienti dal flusso di dati mentre l'applicazione consumer KCL è in esecuzione. Per ulteriori informazioni, consulta [Utilizza una tabella di leasing per tenere traccia degli shard elaborati dall'applicazione consumer KCL](#shared-throughput-kcl-consumers-leasetable).
+ **Processore di record**: la logica che definisce il modo in cui l'applicazione consumer KCL elabora i dati che riceve dai flussi di dati. Durante il runtime, un'istanza dell'applicazione consumer KCL istanzia un worker e questo worker istanzia un processore di record per ogni partizione su cui detiene un lease. 

## Utilizza una tabella di leasing per tenere traccia degli shard elaborati dall'applicazione consumer KCL
<a name="shared-throughput-kcl-consumers-leasetable"></a>

**Topics**
+ [Cos'è una tabella di leasing](#shared-throughput-kcl-consumers-what-is-leasetable)
+ [Throughput](#shared-throughput-kcl-leasetable-throughput)
+ [Come viene sincronizzata una tabella di leasing con gli shard in un flusso di dati Kinesis](#shared-throughput-kcl-consumers-leasetable-sync)

### Cos'è una tabella di leasing
<a name="shared-throughput-kcl-consumers-what-is-leasetable"></a>

Tabella di lease: per ogni applicazione Flusso di dati Amazon Kinesis, la KCL utilizza una tabella di lease univoca (archiviata in una tabella di Amazon DynamoDB) per tenere traccia delle partizioni in un flusso di dati KDS sottoposte a lease ed elaborate dai worker dell'applicazione consumer KCL.

**Importante**  
La KCL utilizza il nome dell'applicazione consumer per creare il nome della tabella di lease utilizzata da questa applicazione consumer, pertanto il nome di ogni applicazione consumer deve essere univoco.

È possibile visualizzare la tabella di lease utilizzando la [console Amazon DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ConsoleDynamoDB.html) mentre l'applicazione è in esecuzione.

Se la tabella di lease per l'applicazione consumer KCL non esiste all'avvio dell'applicazione, uno dei worker la crea per l'applicazione. 

**Importante**  
 Il tuo account sarà addebitato per i costi associati alla tabella DynamoDB, oltre ai costi associati al flusso di dati Kinesis stesso. 

Ogni riga della tabella di lease rappresenta una partizione che viene elaborata dalla tua applicazione consumer. Quando l'applicazione consumer KCL esistente è configurata per elaborare un solo flusso di dati, allora `leaseKey` (che è la chiave hash per la tabella di lease) è l'ID della partizione. Se sei [Elabora più flussi di dati con la stessa applicazione consumer KCL 2.x per Java](#shared-throughput-kcl-multistream), allora la struttura di leaseKey avrà il seguente aspetto: `account-id:StreamName:streamCreationTimestamp:ShardId`. Ad esempio, `111111111:multiStreamTest-1:12345:shardId-000000000336`.

Oltre all'ID dello shard, ogni riga include anche i seguenti dati:
+ **checkpoint:** il numero di sequenza di checkpoint più recente per lo shard. Questo valore è univoco per tutte le partizioni nel flusso di dati.
+ **checkpointSubSequenceNumero:** quando si utilizza la funzione di aggregazione della Kinesis Producer Library, si tratta di un'estensione del **checkpoint** che tiene traccia dei record dei singoli utenti all'interno del record Kinesis.
+ **leaseCounter:** utilizzato per la funzione Versioni multiple del lease, in modo tale da permettere ai lavoratori di rilevare che il loro lease è stato preso da un altro lavoratore.
+ **leaseKey:** un identificatore univoco per un lease. Ogni lease è specifico di una partizione nel flusso di dati ed è detenuto da un worker alla volta.
+ **leaseOwner:** il lavoratore che detiene questo lease.
+ **ownerSwitchesSinceCheckpoint:** quante volte questo contratto di locazione ha cambiato lavoratori dall'ultima volta che è stato scritto un checkpoint.
+ **parentShardId:** Utilizzato per garantire che lo shard principale sia completamente elaborato prima che inizi l'elaborazione sui frammenti secondari. In questo modo, ci si assicura che i record siano elaborati nello stesso ordine in cui sono stati introdotti nel flusso.
+ **hashrange:** utilizzato dalla `PeriodicShardSyncManager` per eseguire sincronizzazioni periodiche per trovare le partizioni mancanti nella tabella di lease e creare lease per esse, se necessario. 
**Nota**  
Questi dati sono presenti nella tabella di lease per ogni partizione a partire da KCL 1.14 e KCL 2.3. Per ulteriori informazioni su `PeriodicShardSyncManager` e sulla sincronizzazione periodica tra lease e partizioni, consulta [Come viene sincronizzata una tabella di leasing con gli shard in un flusso di dati Kinesis](#shared-throughput-kcl-consumers-leasetable-sync).
+ **childshards:** utilizzato da `LeaseCleanupManager` per esaminare lo stato di elaborazione della partizione secondaria e decidere se la partizione principale può essere eliminata dalla tabella di lease.
**Nota**  
Questi dati sono presenti nella tabella di lease per ogni partizione a partire da KCL 1.14 e KCL 2.3.
+ **shardID:** l'ID della partizione.
**Nota**  
Questi dati sono presenti nella tabella dei lease solo se sei [Elabora più flussi di dati con la stessa applicazione consumer KCL 2.x per Java](#shared-throughput-kcl-multistream). È supportato solo in KCL 2.x per Java, a partire da KCL 2.3 per Java e versioni successive. 
+ **nome del flusso:** l'identificatore del flusso di dati nel seguente formato: `account-id:StreamName:streamCreationTimestamp`.
**Nota**  
Questi dati sono presenti nella tabella dei lease solo se sei [Elabora più flussi di dati con la stessa applicazione consumer KCL 2.x per Java](#shared-throughput-kcl-multistream). È supportato solo in KCL 2.x per Java, a partire da KCL 2.3 per Java e versioni successive. 

### Throughput
<a name="shared-throughput-kcl-leasetable-throughput"></a>

Se l'applicazione Flusso di dati Amazon Kinesis riceve eccezioni di velocità di trasmissione effettiva assegnata, dovrai aumentare la velocità di trasmissione effettiva assegnata per la tabella DynamoDB. La KCL crea la tabella con una velocità di trasmissione effettiva assegnata di 10 letture al secondo e 10 scritture al secondo, ma questo potrebbe non essere sufficiente per l'applicazione. Ad esempio, se la tua applicazione Flusso di dati Amazon Kinesis crea frequentemente dei checkpoint o opera in un flusso che è composto da molte partizioni, potrebbe essere necessaria una velocità di trasmissione effettiva maggiore.

Per informazioni sulla velocità di trasmissione effettiva assegnata in DynamoDB, consulta [Modalità capacità di lettura/scrittura](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html) e [Utilizzo di tabelle e dati](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithDDTables.html) nella *Guida per gli sviluppatori di Amazon DynamoDB*.

### Come viene sincronizzata una tabella di leasing con gli shard in un flusso di dati Kinesis
<a name="shared-throughput-kcl-consumers-leasetable-sync"></a>

I worker delle applicazioni consumer KCL utilizzano i lease per elaborare le partizioni di un determinato flusso di dati. Le informazioni su quale worker sta eseguendo il lease di una partizione in un dato momento vengono archiviate in una tabella di lease. La tabella di lease deve rimanere sincronizzata con le ultime informazioni sulle partizioni provenienti dal flusso di dati mentre l'applicazione consumer KCL è in esecuzione. La KCL sincronizza la tabella di lease con le informazioni sulle partizioni acquisite dal servizio del flusso di dati Kinesis durante l'avvio dell'applicazione consumer (quando l'applicazione consumer viene inizializzata o riavviata) e anche ogni volta che una partizione in fase di elaborazione raggiunge il suo termine (ripartizionamento). In altre parole, i worker o un'applicazione consumer KCL vengono sincronizzati con il flusso di dati che stanno elaborando durante l'avvio iniziale dell'applicazione consumer e ogni volta che l'applicazione consumer incontra un evento di ripartizionamento del flusso di dati.

**Topics**
+ [Sincronizzazione in KCL 1.0 - 1.13 e KCL 2.0 - 2.2](#shared-throughput-kcl-consumers-leasetable-sync-old)
+ [Sincronizzazione in KCL 2.x, a partire da KCL 2.3 e versioni successive](#shared-throughput-kcl-consumers-leasetable-sync-new-kcl2)
+ [Sincronizzazione in KCL 1.x, a partire da KCL 1.14 e versioni successive](#shared-throughput-kcl-consumers-leasetable-sync-new-kcl1)

#### Sincronizzazione in KCL 1.0 - 1.13 e KCL 2.0 - 2.2
<a name="shared-throughput-kcl-consumers-leasetable-sync-old"></a>

In KCL 1.0 - 1.13 e KCL 2.0 - 2.2, durante l'avvio dell'applicazione consumer e anche durante ogni evento di reshard del flusso di dati, KCL sincronizza la tabella di lease con le informazioni sugli shard acquisite dal servizio Kinesis Data Streams richiamando l'or the discovery. `ListShards` `DescribeStream` APIs In tutte le versioni KCL sopra elencate, ogni lavoratore di un'applicazione consumer KCL completa i seguenti passaggi per eseguire il processo di lease/shard sincronizzazione durante l'avvio dell'applicazione consumer e in occasione di ogni evento stream reshard:
+ Recupera tutte le partizioni per i dati elaborati dal flusso
+ Recupera tutte i lease delle partizioni dalla tabella di lease
+ Filtra ogni partizione aperta che non ha un lease nella tabella di lease
+ Itera su tutte le partizioni aperte trovate e per ogni partizione aperta senza un elemento principale aperto:
  + Attraversa l'albero gerarchico lungo il percorso dei suoi antenati per determinare se la partizione è un discendente. Una partizione è considerata un discendente se una partizione antenata è in fase di elaborazione (la voce di lease relativa alla partizione antenata esiste nella tabella di lease) o se è necessario elaborare una partizione antenata (ad esempio, se la posizione iniziale è `TRIM_HORIZON` o `AT_TIMESTAMP`)
  + Se la partizione aperta nel contesto è un discendente, la KCL controlla la partizione in base alla posizione iniziale e crea dei lease per i suoi elementi principali, se necessario

#### Sincronizzazione in KCL 2.x, a partire da KCL 2.3 e versioni successive
<a name="shared-throughput-kcl-consumers-leasetable-sync-new-kcl2"></a>

A partire dalle ultime versioni supportate di KCL 2.x (KCL 2.3) e successive, la libreria ora supporta le seguenti modifiche al processo di sincronizzazione. Queste modifiche lease/shard alla sincronizzazione riducono significativamente il numero di chiamate API effettuate dalle applicazioni consumer KCL al servizio Kinesis Data Streams e ottimizzano la gestione del leasing nell'applicazione consumer KCL. 
+ Durante l'avvio dell'applicazione, se la tabella di lease è vuota, la KCL utilizza l'opzione di filtro dell'API `ListShard` (il parametro di richiesta `ShardFilter` facoltativo) per recuperare e creare lease solo per uno snapshot di partizioni aperte nel momento specificato dal parametro `ShardFilter`. Il parametro `ShardFilter` consente di filtrare la risposta dell'API `ListShards`. L'unica proprietà richiesta del parametro `ShardFilter` è `Type`. KCL utilizza la proprietà di filtro `Type` e i seguenti valori validi per identificare e restituire uno snapshot delle partizioni aperte che potrebbero richiedere nuovi lease:
  + `AT_TRIM_HORIZON`: la risposta include tutte le partizioni che erano aperte in `TRIM_HORIZON`. 
  + `AT_LATEST`: la risposta include solo le partizioni del flusso di dati correntemente aperte. 
  + `AT_TIMESTAMP`: la risposta include tutte le partizioni il cui timestamp di inizio è inferiore o uguale al timestamp specificato e il timestamp di fine è maggiore o uguale al timestamp specificato o sono ancora aperte.

  `ShardFilter` viene utilizzato durante la creazione di lease per una tabella di lease vuota per inizializzare i lease per uno snapshot delle partizioni specificate in `RetrievalConfig#initialPositionInStreamExtended`.

  Per ulteriori informazioni su `ShardFilter`, consultare [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html).
+ Invece che tutti i lavoratori eseguano la lease/shard sincronizzazione per mantenere la tabella dei leasing aggiornata con gli shard più recenti nel flusso di dati, un unico responsabile dei lavoratori eletto esegue la sincronizzazione tra lease e shard.
+ KCL 2.3 utilizza il parametro `ChildShards` return di `GetRecords` and the `SubscribeToShard` APIs per eseguire la lease/shard sincronizzazione che avviene per gli shard chiusi, consentendo a un worker KCL di creare leasing solo `SHARD_END` per i frammenti secondari dello shard che ha terminato l'elaborazione. Per la condivisione tra le applicazioni consumer, questa ottimizzazione della lease/shard sincronizzazione utilizza il parametro dell'API. `ChildShards` `GetRecords` Per le applicazioni consumer con throughput dedicato (fan-out avanzato), questa ottimizzazione della lease/shard sincronizzazione utilizza il parametro dell'`ChildShards`API. `SubscribeToShard` Per ulteriori informazioni, consultare [GetRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html), [SubscribeToShards](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html) e [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html).
+ Con le modifiche di cui sopra, il comportamento della KCL sta passando dal modello in cui tutti i worker apprendono tutte le partizioni esistenti a un modello in cui i worker apprendono solo le partizioni secondarie delle partizioni di proprietà di ogni worker. Pertanto, oltre alla sincronizzazione che avviene durante gli eventi di bootstraping e reshard delle applicazioni consumer, KCL ora esegue anche shard/lease scansioni periodiche aggiuntive per identificare eventuali buchi nella tabella di lease (in altre parole, per conoscere tutti i nuovi shard) per garantire l'elaborazione dell'intervallo hash completo del flusso di dati e creare contratti di leasing, se necessario. `PeriodicShardSyncManager`è lease/shard il componente responsabile dell'esecuzione delle scansioni periodiche. 

  Per ulteriori informazioni su `PeriodicShardSyncManager` KCL 2.3, vedere [https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java#L201-L213) \$1L201 -L213.

  In KCL 2.3, sono disponibili nuove opzioni di configurazione per configurare `PeriodicShardSyncManager` in `LeaseManagementConfig`:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/streams/latest/dev/shared-throughput-kcl-consumers.html)

  Ora vengono inoltre emesse nuove CloudWatch metriche per monitorare lo stato di. `PeriodicShardSyncManager` Per ulteriori informazioni, consulta [PeriodicShardSyncManager](monitoring-with-kcl.md#periodic-task).
+ Inclusa un'ottimizzazione a `HierarchicalShardSyncer` per creare lease solo per un livello di partizioni.

#### Sincronizzazione in KCL 1.x, a partire da KCL 1.14 e versioni successive
<a name="shared-throughput-kcl-consumers-leasetable-sync-new-kcl1"></a>

A partire dalle ultime versioni supportate di KCL 1.x (KCL 1.14) e successive, la libreria ora supporta le seguenti modifiche al processo di sincronizzazione. Queste modifiche lease/shard alla sincronizzazione riducono significativamente il numero di chiamate API effettuate dalle applicazioni consumer KCL al servizio Kinesis Data Streams e ottimizzano la gestione del leasing nell'applicazione consumer KCL. 
+ Durante l'avvio dell'applicazione, se la tabella di lease è vuota, la KCL utilizza l'opzione di filtro dell'API `ListShard` (il parametro di richiesta `ShardFilter` facoltativo) per recuperare e creare lease solo per uno snapshot di partizioni aperte nel momento specificato dal parametro `ShardFilter`. Il parametro `ShardFilter` consente di filtrare la risposta dell'API `ListShards`. L'unica proprietà richiesta del parametro `ShardFilter` è `Type`. KCL utilizza la proprietà di filtro `Type` e i seguenti valori validi per identificare e restituire uno snapshot delle partizioni aperte che potrebbero richiedere nuovi lease:
  + `AT_TRIM_HORIZON`: la risposta include tutte le partizioni che erano aperte in `TRIM_HORIZON`. 
  + `AT_LATEST`: la risposta include solo le partizioni del flusso di dati correntemente aperte. 
  + `AT_TIMESTAMP`: la risposta include tutte le partizioni il cui timestamp di inizio è inferiore o uguale al timestamp specificato e il timestamp di fine è maggiore o uguale al timestamp specificato o sono ancora aperte.

  `ShardFilter` viene utilizzato durante la creazione di lease per una tabella di lease vuota per inizializzare i lease per uno snapshot delle partizioni specificate in `KinesisClientLibConfiguration#initialPositionInStreamExtended`.

  Per ulteriori informazioni su `ShardFilter`, consultare [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html).
+ Invece che tutti i lavoratori eseguano la lease/shard sincronizzazione per mantenere la tabella dei leasing aggiornata con gli shard più recenti nel flusso di dati, un unico responsabile dei lavoratori eletto esegue la sincronizzazione tra lease e shard.
+ KCL 1.14 utilizza il parametro `ChildShards` return di `GetRecords` and the `SubscribeToShard` APIs per eseguire la lease/shard sincronizzazione che avviene per gli shard chiusi, consentendo a un worker KCL di creare leasing solo `SHARD_END` per i frammenti secondari dello shard che ha terminato l'elaborazione. Per ulteriori informazioni, consultare [GetRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html) e [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html).
+ Con le modifiche di cui sopra, il comportamento della KCL sta passando dal modello in cui tutti i worker apprendono tutte le partizioni esistenti a un modello in cui i worker apprendono solo le partizioni secondarie delle partizioni di proprietà di ogni worker. Pertanto, oltre alla sincronizzazione che avviene durante gli eventi di bootstraping e reshard delle applicazioni consumer, KCL ora esegue anche shard/lease scansioni periodiche aggiuntive per identificare eventuali buchi nella tabella di lease (in altre parole, per conoscere tutti i nuovi shard) per garantire l'elaborazione dell'intervallo hash completo del flusso di dati e creare contratti di leasing, se necessario. `PeriodicShardSyncManager`è lease/shard il componente responsabile dell'esecuzione delle scansioni periodiche. 

  Quando `KinesisClientLibConfiguration#shardSyncStrategyType` è impostato su `ShardSyncStrategyType.SHARD_END`, `PeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThreshold` viene utilizzato per determinare la soglia per il numero di scansioni consecutive contenenti buchi nella tabella di lease, dopodiché imporre la sincronizzazione delle partizioni. Quando `KinesisClientLibConfiguration#shardSyncStrategyType` è impostato su `ShardSyncStrategyType.PERIODIC`, `leasesRecoveryAuditorInconsistencyConfidenceThreshold` viene ignorato.

  Per ulteriori informazioni su `PeriodicShardSyncManager` KCL 1.14, vedere [https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java](https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L987-L999) \$1L987 -L999.

  In KCL 1.14, è disponibile una nuova opzione di configurazione per configurare `PeriodicShardSyncManager` in `LeaseManagementConfig`:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/streams/latest/dev/shared-throughput-kcl-consumers.html)

  Ora vengono inoltre emesse nuove CloudWatch metriche per monitorare lo stato di. `PeriodicShardSyncManager` Per ulteriori informazioni, consulta [PeriodicShardSyncManager](monitoring-with-kcl.md#periodic-task).
+ KCL 1.14 ora supporta anche la pulizia differita dei lease. I lease vengono eliminati in modo asincrono da `LeaseCleanupManager` quando viene raggiunto `SHARD_END` o quando una partizione è scaduta, superando il periodo di conservazione del flusso di dati o è stata chiusa a seguito di un'operazione di ripartizionamento.

  Sono disponibili nuove opzioni di configurazione per configurare `LeaseCleanupManager`.    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/streams/latest/dev/shared-throughput-kcl-consumers.html)
+ Inclusa un'ottimizzazione a `KinesisShardSyncer` per creare lease solo per un livello di partizioni.

## Elabora più flussi di dati con la stessa applicazione consumer KCL 2.x per Java
<a name="shared-throughput-kcl-multistream"></a>

Questa sezione descrive le seguenti modifiche a KCL 2.x per Java che consentono di creare applicazioni consumer KCL in grado di elaborare più di un flusso di dati contemporaneamente. 

**Importante**  
L'elaborazione multistream è supportata solo in KCL 2.x per Java, a partire da KCL 2.3 per Java e versioni successive.   
L'elaborazione multistream NON è supportata per altri linguaggi in cui è possibile implementare KCL 2.x.  
L'elaborazione multistream NON è supportata in nessuna versione di KCL 1.x.
+ **MultistreamTracker interfaccia**

  Per creare un'applicazione consumer in grado di elaborare più flussi contemporaneamente, è necessario implementare una nuova interfaccia denominata [MultistreamTracker](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java). Questa interfaccia include il metodo `streamConfigList` che restituisce l'elenco dei flussi di dati e le relative configurazioni che devono essere elaborati dall'applicazione consumer KCL. Si noti che i flussi di dati in fase di elaborazione possono essere modificati durante il runtime dell'applicazione consumer. `streamConfigList` viene chiamato periodicamente dalla KCL per conoscere le modifiche nei flussi di dati da elaborare.

  Il `streamConfigList` metodo compila l'elenco. [StreamConfig](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java#L23) 

  ```
  package software.amazon.kinesis.common;
  
  import lombok.Data;
  import lombok.experimental.Accessors;
  
  @Data
  @Accessors(fluent = true)
  public class StreamConfig {
      private final StreamIdentifier streamIdentifier;
      private final InitialPositionInStreamExtended initialPositionInStreamExtended;
      private String consumerArn;
  }
  ```

  Nota che `StreamIdentifier` e `InitialPositionInStreamExtended` sono obbligatori, mentre `consumerArn` è facoltativo. È necessario fornire `consumerArn` solo se si utilizza KCL 2.x per implementare un'applicazione consumer fan-out migliorata.

  Per ulteriori informazioni su`StreamIdentifier`, vedere [https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java \$1L129.](https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L129) Per creare un`StreamIdentifier`, ti consigliamo di creare un'istanza multistream da `streamArn` and the disponibile nella versione `streamCreationEpoch` 2.5.0 e successive. In KCL v2.3 e v2.4, che non supportano`streamArm`, crea un'istanza multistream utilizzando il formato. `account-id:StreamName:streamCreationTimestamp` Questo formato sarà obsoleto e non sarà più supportato a partire dalla prossima versione principale.

  `MultistreamTracker` include anche una strategia per eliminare i lease di vecchi flussi nella tabella dei lease (`formerStreamsLeasesDeletionStrategy`). Si noti che la strategia NON PUÒ essere modificata durante il runtime dell'applicazione consumer. [Per maggiori informazioni, consulta /blob/0c5042dadf794fe988438436252a5a8fe70b6b0 .java https://github.com/awslabs/ amazon-kinesis-client b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java)
+ [ConfigsBuilder](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java)è una classe a livello di applicazione che puoi utilizzare per specificare tutte le impostazioni di configurazione KCL 2.x da utilizzare durante la creazione dell'applicazione consumer KCL. `ConfigsBuilder`la classe ora supporta l'interfaccia. `MultistreamTracker` Puoi inizializzarli ConfigsBuilder entrambi con il nome dell'unico flusso di dati da cui consumare i record da:

  ```
   /**
       * Constructor to initialize ConfigsBuilder with StreamName
       * @param streamName
       * @param applicationName
       * @param kinesisClient
       * @param dynamoDBClient
       * @param cloudWatchClient
       * @param workerIdentifier
       * @param shardRecordProcessorFactory
       */
      public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName,
              @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
              @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
              @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
          this.appStreamTracker = Either.right(streamName);
          this.applicationName = applicationName;
          this.kinesisClient = kinesisClient;
          this.dynamoDBClient = dynamoDBClient;
          this.cloudWatchClient = cloudWatchClient;
          this.workerIdentifier = workerIdentifier;
          this.shardRecordProcessorFactory = shardRecordProcessorFactory;
      }
  ```

  Oppure puoi inizializzare ConfigsBuilder con `MultiStreamTracker` se desideri implementare un'applicazione consumer KCL che elabora più flussi contemporaneamente.

  ```
  * Constructor to initialize ConfigsBuilder with MultiStreamTracker
       * @param multiStreamTracker
       * @param applicationName
       * @param kinesisClient
       * @param dynamoDBClient
       * @param cloudWatchClient
       * @param workerIdentifier
       * @param shardRecordProcessorFactory
       */
      public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName,
              @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
              @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
              @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
          this.appStreamTracker = Either.left(multiStreamTracker);
          this.applicationName = applicationName;
          this.kinesisClient = kinesisClient;
          this.dynamoDBClient = dynamoDBClient;
          this.cloudWatchClient = cloudWatchClient;
          this.workerIdentifier = workerIdentifier;
          this.shardRecordProcessorFactory = shardRecordProcessorFactory;
      }
  ```
+ Con il supporto multistream implementato per la tua applicazione consumer KCL, ogni riga della tabella di lease dell'applicazione ora contiene l'ID della partizione e il nome del flusso dei molteplici flussi di dati elaborati da questa applicazione. 
+ Quando viene implementato il supporto multistream per la tua applicazione consumer KCL, leaseKey assume la seguente struttura: `account-id:StreamName:streamCreationTimestamp:ShardId`. Ad esempio, `111111111:multiStreamTest-1:12345:shardId-000000000336`.
**Importante**  
Quando l'applicazione consumer KCL esistente è configurata per elaborare un solo flusso di dati, leaseKey (che è la chiave hash per la tabella di lease) è l'ID della partizione. Se riconfiguri questa applicazione consumer KCL esistente per elaborare più flussi di dati, la tabella di lease viene interrotta perché con il supporto multistream, la struttura leaseKey deve essere `account-id:StreamName:StreamCreationTimestamp:ShardId`.

## Usa KCL con lo Schema Registry AWS Glue
<a name="shared-throughput-kcl-consumers-glue-schema-registry"></a>

Puoi integrare i tuoi flussi di dati Kinesis con lo Schema Registry. AWS Glue Lo AWS Glue Schema Registry ti consente di scoprire, controllare ed evolvere centralmente gli schemi, garantendo al contempo che i dati prodotti siano convalidati continuamente da uno schema registrato. Uno schema definisce la struttura e il formato di un registro di dati. Uno schema è una specifica con versioni per la pubblicazione, il consumo o l'archiviazione dei dati in modo affidabile. Lo AWS Glue Schema Registry consente di migliorare la qualità e la governance end-to-end dei dati all'interno delle applicazioni di streaming. Per ulteriori informazioni, consulta [Registro degli schemi di AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). Uno dei modi per configurare questa integrazione è tramite la KCL in Java. 

**Importante**  
Attualmente, l'integrazione tra Kinesis Data AWS Glue Streams e Schema Registry è supportata solo per i flussi di dati Kinesis che utilizzano i consumatori KCL 2.3 implementati in Java. Il supporto multilingue non viene fornito. I consumer KCL 1.0 non sono supportati. I consumer KCL 2.x precedenti a KCL 2.3 non sono supportati.

Per istruzioni dettagliate su come configurare l'integrazione di Kinesis Data Streams con Schema Registry utilizzando KCL, consulta la [sezione «Interazione con i dati KPL/KCL utilizzando le librerie» in Caso d'uso: integrazione di Amazon Kinesis Data Streams](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds) con il registro dello schema Glue. AWS 

# Sviluppa consumatori personalizzati con un throughput condiviso
<a name="shared-throughput-consumers"></a>

**Importante**  
Le versioni 1.x e 2.x di Amazon KCL sono obsolete. KCL 1.x arriverà il 30 end-of-support gennaio 2026. **Ti consigliamo vivamente di** migrare le tue applicazioni KCL utilizzando la versione 1.x all'ultima versione di KCL prima del 30 gennaio 2026. Per trovare la versione più recente di KCL, consulta la pagina [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) su. GitHub Per informazioni sulle ultime versioni di KCL, consulta. [Usa la libreria client Kinesis](kcl.md) Per informazioni sulla migrazione da KCL 1.x a KCL 3.x, consulta. [Migrazione di KLC da 1.x a 3.x](kcl-migration-1-3.md)

Se non è necessaria una velocità di trasmissione effettiva dedicata durante la ricezione dei dati dal flusso di dati Kinesis e se non sono necessari ritardi di propagazione della lettura inferiori a 200 ms, puoi creare applicazioni consumater come descritto nei seguenti argomenti. È possibile utilizzare la Kinesis Client Library (KCL) o il. AWS SDK per Java

**Topics**
+ [Sviluppa consumatori personalizzati con un throughput condiviso utilizzando KCL](custom-kcl-consumers.md)

Per ulteriori informazioni sulla creazione di consumer che possono ricevere record dal flusso di dati Kinesis con velocità di trasmissione effettiva dedicata, consulta [Sviluppa consumatori con fan-out migliorati con un throughput dedicato](enhanced-consumers.md).

# Sviluppa consumatori personalizzati con un throughput condiviso utilizzando KCL
<a name="custom-kcl-consumers"></a>

**Importante**  
Le versioni 1.x e 2.x di Amazon KCL sono obsolete. KCL 1.x arriverà il 30 end-of-support gennaio 2026. **Ti consigliamo vivamente di** migrare le tue applicazioni KCL utilizzando la versione 1.x all'ultima versione di KCL prima del 30 gennaio 2026. Per trovare la versione più recente di KCL, consulta la pagina [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) su. GitHub Per informazioni sulle ultime versioni di KCL, consulta. [Usa la libreria client Kinesis](kcl.md) Per informazioni sulla migrazione da KCL 1.x a KCL 3.x, consulta. [Migrazione di KLC da 1.x a 3.x](kcl-migration-1-3.md)

Uno dei metodi per sviluppare un'applicazione consumer personalizzata con velocità di trasmissione effettiva condivisa consiste nell'utilizzare la Kinesis Client Library (KCL). 

Scegli tra i seguenti argomenti per la versione di KCL che stai utilizzando.

**Topics**
+ [Sviluppa i consumatori di KCL 1.x](developing-consumers-with-kcl.md)
+ [Sviluppa KCL 2.x Consumers](developing-consumers-with-kcl-v2.md)

# Sviluppa i consumatori di KCL 1.x
<a name="developing-consumers-with-kcl"></a>

**Importante**  
Le versioni 1.x e 2.x di Amazon KCL sono obsolete. KCL 1.x arriverà il 30 end-of-support gennaio 2026. **Ti consigliamo vivamente di** migrare le tue applicazioni KCL utilizzando la versione 1.x all'ultima versione di KCL prima del 30 gennaio 2026. Per trovare la versione più recente di KCL, consulta la pagina [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) su. GitHub Per informazioni sulle ultime versioni di KCL, consulta. [Usa la libreria client Kinesis](kcl.md) Per informazioni sulla migrazione da KCL 1.x a KCL 3.x, consulta. [Migrazione di KLC da 1.x a 3.x](kcl-migration-1-3.md)

È possibile sviluppare un'applicazione consumer per flusso di dati Amazon Kinesis utilizzando la Kinesis Client Library (KCL). 

Per ulteriori informazioni su KCL, consulta [Informazioni su KCL (versioni precedenti)](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-overview).

Scegli tra i seguenti argomenti a seconda dell'opzione che desideri utilizzare.

**Topics**
+ [Sviluppa un utente di Kinesis Client Library in Java](kinesis-record-processor-implementation-app-java.md)
+ [Sviluppa un utente della Kinesis Client Library in Node.js](kinesis-record-processor-implementation-app-nodejs.md)
+ [Sviluppa un utente di Kinesis Client Library in .NET](kinesis-record-processor-implementation-app-dotnet.md)
+ [Sviluppa un utente della Kinesis Client Library in Python](kinesis-record-processor-implementation-app-py.md)
+ [Sviluppa un utente di Kinesis Client Library in Ruby](kinesis-record-processor-implementation-app-ruby.md)

# Sviluppa un utente di Kinesis Client Library in Java
<a name="kinesis-record-processor-implementation-app-java"></a>

**Importante**  
Le versioni 1.x e 2.x di Amazon KCL sono obsolete. KCL 1.x arriverà il 30 end-of-support gennaio 2026. **Ti consigliamo vivamente di** migrare le tue applicazioni KCL utilizzando la versione 1.x all'ultima versione di KCL prima del 30 gennaio 2026. Per trovare la versione più recente di KCL, consulta la pagina [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) su. GitHub Per informazioni sulle ultime versioni di KCL, consulta. [Usa la libreria client Kinesis](kcl.md) Per informazioni sulla migrazione da KCL 1.x a KCL 3.x, consulta. [Migrazione di KLC da 1.x a 3.x](kcl-migration-1-3.md)

È possibile utilizzare la Kinesis Client Library (KCL) per creare applicazioni che elaborano dati dai tuoi flussi di dati Kinesis. La Kinesis Client Library è disponibile in più linguaggi. In questo argomento viene discusso Java. [Per visualizzare il riferimento a Javadoc, consultate l'argomento Javadoc per Class.AWS AmazonKinesisClient](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html)

Per scaricare Java KCL da GitHub, vai a [Kinesis Client Library (](https://github.com/awslabs/amazon-kinesis-client)Java). Per individuare la KCL Java su Apache Maven, vai alla pagina [Risultati di ricerca di KCL](https://search.maven.org/#search|ga|1|amazon-kinesis-client). Per scaricare il codice di esempio per un'applicazione consumer Java KCL da GitHub, vai alla pagina del progetto di [esempio KCL for Java](https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis) su. GitHub 

L'applicazione di esempio utilizza [Apache Commons Logging](http://commons.apache.org/proper/commons-logging/guide.html). È possibile modificare la configurazione di registro nel metodo statico `configure` definito nel file `AmazonKinesisApplicationSample.java`. [https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/java-dg-logging.html](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/java-dg-logging.html)

È necessario completare le seguenti attività durante l'implementazione di un'applicazione consumer KCL in Java:

**Topics**
+ [Implementa i metodi del processore IRecord](#kinesis-record-processor-implementation-interface-java)
+ [Implementa una fabbrica di classi per l'interfaccia IRecord Processor](#kinesis-record-processor-implementation-factory-java)
+ [Crea un lavoratore](#kcl-java-worker)
+ [Modificare le proprietà di configurazione](#kinesis-record-processor-initialization-java)
+ [Esegui la migrazione alla versione 2 dell'interfaccia del processore di registrazione](#kcl-java-v2-migration)

## Implementa i metodi del processore IRecord
<a name="kinesis-record-processor-implementation-interface-java"></a>

La KCL supporta attualmente due versioni dell'interfaccia `IRecordProcessor`: l'interfaccia originale è disponibile con la prima versione della KCL e la versione 2 è disponibile a partire da KCL versione 1.5.0. Entrambe le interfacce sono completamente supportate. La scelta dipende dai tuoi requisiti specifici di scenario. Fai riferimento ai tuoi Javadocs locali o al codice sorgente per visualizzare tutte le differenze. Le seguenti sezioni delineano l'implementazione minima per iniziare.

**Topics**
+ [Interfaccia originale (Versione 1)](#kcl-java-interface-original)
+ [Interfaccia aggiornata (versione 2)](#kcl-java-interface-v2)

### Interfaccia originale (Versione 1)
<a name="kcl-java-interface-original"></a>

L'interfaccia `IRecordProcessor` originale (`package com.amazonaws.services.kinesis.clientlibrary.interfaces`) espone i seguenti metodi di processore del record che il tuo consumer deve implementare. L'esempio fornisce implementazioni che è possibile utilizzare come punto di partenza (consulta `AmazonKinesisApplicationSampleRecordProcessor.java`).

```
public void initialize(String shardId)
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

**initialize**  
La KCL chiama il metodo `initialize` quando viene creata un'istanza del processore di record, passando un ID della partizione specifico come parametro. Questo processore di record elabora esclusivamente questo shard e, in genere, è vero anche il contrario (questo shard è elaborato solo da questo processore di record). Tuttavia, il tuo consumer deve tenere conto della possibilità che un record di dati possa essere elaborato più di una volta. Il flusso di dati Kinesis ha una semantica *almeno una volta*, il che significa che ogni record di dati da una partizione viene elaborato almeno una volta da un worker nel tuo consumer. Per ulteriori informazioni sui casi in cui un determinato shard può essere elaborato da più di un lavoratore, consulta [Usa il resharding, lo scaling e l'elaborazione parallela per modificare il numero di shard](kinesis-record-processor-scaling.md).

```
public void initialize(String shardId)
```

**processRecords**  
La KCL chiama il metodo `processRecords` e passa un elenco di record di dati dalla partizione specificata dal metodo `initialize(shardId)`. Il processore di record elabora i dati in questi record in base alla semantica del consumer. Ad esempio, il worker potrebbe eseguire una trasformazione dei dati e, successivamente, archiviare il risultato in un bucket Amazon Simple Storage Service (Amazon S3).

```
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) 
```

Oltre ai dati stessi, il record contiene anche un numero di sequenza e una chiave di partizione. Il lavoratore può utilizzare questi valori quando elabora i dati. Ad esempio, il lavoratore può scegliere il bucket S3 in cui archiviare i dati in base al valore della chiave di partizione. La classe `Record` espone i seguenti metodi che forniscono l'accesso ai dati del record, al numero di sequenza e alla chiave di partizione. 

```
record.getData()  
record.getSequenceNumber() 
record.getPartitionKey()
```

Nell'esempio, il metodo privato `processRecordsWithRetries` ha un codice che mostra in che modo un lavoratore può accedere ai dati del record, al numero di sequenza e alla chiave di partizione.

Il flusso di dati Kinesis richiede che il processore di record tenga traccia dei record che sono già stati elaborati in una partizione. La KCL si occupa di questo monitoraggio per te, passando un checkpointer (`IRecordProcessorCheckpointer`) a `processRecords`. Il processore di record chiama il metodo `checkpoint` in questa interfaccia per comunicare alla KCL quanto si è progredito nell'elaborazione dei record nella partizione. In caso di errore del worker, la KCL utilizza queste informazioni per riavviare l'elaborazione della partizione nell'ultimo record elaborato conosciuto.

Per le operazioni di divisione o unione, la KCL non avvierà l'elaborazione delle nuove partizioni fino a quando i processori delle partizioni originali non avranno chiamato `checkpoint` per segnalare che l'intera elaborazione delle partizioni originali è completa.

Se non viene passato un parametro, la KCL suppone che la chiamata a `checkpoint` significa che tutti i record sono stati elaborati, fino all'ultimo record passato al processore di record. Pertanto, il processore di record deve chiamare `checkpoint` solo dopo aver elaborato tutti i record nell'elenco passato al processore. I processori di record non devono chiamare `checkpoint` in ciascuna chiamata a `processRecords`. Un processore potrebbe, per esempio, chiamare `checkpoint` in ogni terza chiamata a `processRecords`. Puoi specificare, in modo facoltativo, il numero di sequenza esatto di un record come parametro per `checkpoint`. In questo caso, la KCL presuppone che tutti i record siano stati elaborati esclusivamente fino a tale record.

Nell'esempio, il metodo privato `checkpoint` mostra come effettuare la chiamata a `IRecordProcessorCheckpointer.checkpoint` utilizzando la gestione delle eccezioni e la logica dei nuovi tentativi appropriate.

La KCL si basa su `processRecords` per gestire eventuali eccezioni generate dall'elaborazione dei record di dati. Se viene generata un'eccezione da `processRecords`, la KCL omette i record di dati passati prima dell'eccezione. Ciò significa che questi record non sono inviati nuovamente al processore di record che ha generato l'eccezione o a qualsiasi altro processore di record nel consumer.

**shutdown**  
La KCL chiama il metodo `shutdown` sia al termine dell'elaborazione (il motivo dell'arresto è `TERMINATE`) che quando il worker non risponde più (il motivo dell'arresto è `ZOMBIE`).

```
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

L'elaborazione termina quando il processore di record non riceve ulteriori record dallo shard, perché lo shard è stato frazionato o fuso o perché il flusso è stato eliminato.

La KCL trasferisce inoltre un'interfaccia `IRecordProcessorCheckpointer` a `shutdown`. Se il motivo dell'arresto è `TERMINATE`, il processore di record deve terminare l'elaborazione di qualsiasi record di dati e, di seguito, chiamare il metodo `checkpoint` in questa interfaccia.

### Interfaccia aggiornata (versione 2)
<a name="kcl-java-interface-v2"></a>

L'interfaccia `IRecordProcessor` aggiornata (`package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`) espone i seguenti metodi di processore del record che il tuo consumer deve implementare: 

```
void initialize(InitializationInput initializationInput)
void processRecords(ProcessRecordsInput processRecordsInput)
void shutdown(ShutdownInput shutdownInput)
```

Tutti gli argomenti dalla versione originale dell'interfaccia sono accessibili tramite metodi get negli oggetti del container. Ad esempio, per recuperare l'elenco dei record in `processRecords()`, è possibile utilizzare `processRecordsInput.getRecords()`.

A partire dalla versione 2 di questa interfaccia (KCL 1.5.0 e versioni successive), i seguenti nuovi input sono disponibili in aggiunta agli input forniti dall'interfaccia originale:

Numero di sequenza di partenza  
Nell'oggetto `InitializationInput` passato all'operazione `initialize()`, il numero di sequenza iniziale a partire da cui i record verrebbero forniti all'istanza del processore di record. Questo è l'ultimo numero di sequenza in cui è stato eseguito il checkpoint dall'istanza del processore di record che aveva precedentemente elaborato lo stesso shard. Questi dati sono forniti nel caso in cui la tua applicazione necessiti di queste informazioni. 

Numero di sequenza di checkpoint in sospeso  
Nell'oggetto `InitializationInput` passato all'operazione `initialize()`, il numero di sequenza di checkpoint in sospeso (se del caso) che non è stato possibile confermare prima dell'arresto dell'istanza precedente del processore di record.

## Implementa una fabbrica di classi per l'interfaccia IRecord Processor
<a name="kinesis-record-processor-implementation-factory-java"></a>

È inoltre necessario implementare un generatore per la classe che implementa i metodi del processore di record. Quando il tuo consumer avvia un'istanza del lavoratore, passa un riferimento a questo generatore.

Il campione implementa il generatore di classe nel file `AmazonKinesisApplicationSampleRecordProcessorFactory.java` utilizzando l'interfaccia del processore di record originale. Se si desidera che il generatore di classe crei processori di record della versione 2, utilizzi il nome del pacchetto `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`.

```
  public class SampleRecordProcessorFactory implements IRecordProcessorFactory { 
      /**
      * Constructor.
      */
      public SampleRecordProcessorFactory() {
          super();
      }
      /**
      * {@inheritDoc}
      */
      @Override
      public IRecordProcessor createProcessor() {
          return new SampleRecordProcessor();
      }
  }
```

## Crea un lavoratore
<a name="kcl-java-worker"></a>

Come discusso nella [Implementa i metodi del processore IRecord](#kinesis-record-processor-implementation-interface-java), ci sono due versioni dell'interfaccia del processore di record KCL da cui scegliere; ciò influenza il modo in cui è possibile creare un worker. L'interfaccia del processore di record originale utilizza la seguente struttura di codice per creare un lavoratore:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker(recordProcessorFactory, config);
```

Con la versione 2 dell'interfaccia del processore di record, è possibile utilizzare `Worker.Builder` per creare un lavoratore senza dover preoccuparsi di quale costruttore utilizzare e dell'ordine degli argomenti. L'interfaccia del processore di record aggiornata utilizza la seguente struttura di codice per creare un lavoratore:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

## Modificare le proprietà di configurazione
<a name="kinesis-record-processor-initialization-java"></a>

L'esempio fornisce valori di default per le proprietà di configurazione. Questi dati di configurazione per il lavoratore sono poi consolidati in un oggetto `KinesisClientLibConfiguration`. Questo oggetto e un riferimento al generatore di classe per `IRecordProcessor` sono passati nella chiamata che avvia un'istanza del lavoratore. È possibile sostituire una qualsiasi di queste proprietà con i tuoi valori utilizzando un file di proprietà Java (consulta `AmazonKinesisApplicationSample.java`).

### Application name (Nome applicazione)
<a name="configuration-property-application-name"></a>

La KCL richiede un nome dell'applicazione univoco per tutte le applicazioni e per tutte le tabelle Amazon DynamoDB nella stessa Regione. La biblioteca utilizza il valore di configurazione del nome dell'applicazione nei seguenti modi:
+ Si suppone che tutti i lavoratori associati con questo nome dell'applicazione stiano lavorando insieme nello stesso flusso. Questi lavoratori potrebbero essere distribuiti su più istanze. Se si esegue un'istanza aggiuntiva dello stesso codice dell'applicazione, ma con un nome dell'applicazione diverso, la KCL tratta la seconda istanza come un'applicazione completamente separata che opera anch'essa nello stesso flusso.
+ La KCL crea una tabella DynamoDB con il nome dell'applicazione e la utilizza per mantenere le informazioni sullo stato (ad esempio, checkpoint e mappatura worker-partizione) per l'applicazione. Ogni applicazione ha la propria tabella DynamoDB. Per ulteriori informazioni, consulta [Utilizza una tabella di leasing per tenere traccia degli shard elaborati dall'applicazione consumer KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Configurare le credenziali
<a name="kinesis-record-processor-cred-java"></a>

È necessario rendere disponibili le AWS credenziali a uno dei provider di credenziali della catena di provider di credenziali predefinita. Ad esempio, se l'applicazione consumer è in esecuzione su un'istanza Amazon EC2, consigliamo di avviare l'istanza con un ruolo IAM. Le credenziali AWS che riflettono le autorizzazioni associate a questo ruolo IAM vengono rese disponibili alle applicazioni sull'istanza tramite i relativi metadati dell'istanza. Questo è il modo più sicuro per gestire le credenziali per un consumer in esecuzione in un'istanza EC2.

L'applicazione di esempio prova prima a recuperare le credenziali IAM dai metadati dell'istanza: 

```
credentialsProvider = new InstanceProfileCredentialsProvider(); 
```

Se l'applicazione di esempio non è in grado di ottenere le credenziali dai metadati dell'istanza, tenta di recuperare le credenziali da un file proprietà:

```
credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
```

Per ulteriori informazioni sui metadati delle istanze, consulta [Instance Metadata](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html) nella *Amazon EC2* User Guide.

### Usa l'ID del lavoratore per più istanze
<a name="kinesis-record-processor-workerid-java"></a>

L'esempio di codice di inizializzazione crea un ID per il lavoratore, `workerId`utilizzando il nome del computer locale e aggiungendo un identificatore univoco globale come illustrato nel seguente frammento di codice. Questo approccio supporta lo scenario di più istanze dell'applicazione di consumo in esecuzione in un singolo computer.

```
String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
```

## Esegui la migrazione alla versione 2 dell'interfaccia del processore di registrazione
<a name="kcl-java-v2-migration"></a>

Se si desidera migrare il codice che utilizza l'interfaccia originale, in aggiunta ai passaggi descritti in precedenza, sono necessari i seguenti passaggi:

1. Cambia la classe del tuo processore di record per importare la versione 2 dell'interfaccia del processore di record:

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   ```

1. Cambia i riferimenti per gli input per utilizzare i metodi `get` negli oggetti del container. Ad esempio, nell'operazione `shutdown()`, cambia "`checkpointer`" con "`shutdownInput.getCheckpointer()`".

1. Cambia la classe del generatore del processore di record per importare la versione 2 dell'interfaccia del generatore del processore di record:

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   ```

1. Cambia la costruzione del lavoratore per utilizzare `Worker.Builder`. Esempio:

   ```
   final Worker worker = new Worker.Builder()
       .recordProcessorFactory(recordProcessorFactory)
       .config(config)
       .build();
   ```

# Sviluppa un utente della Kinesis Client Library in Node.js
<a name="kinesis-record-processor-implementation-app-nodejs"></a>

**Importante**  
Le versioni 1.x e 2.x di Amazon KCL sono obsolete. KCL 1.x sarà disponibile il 30 end-of-support gennaio 2026. **Ti consigliamo vivamente di** migrare le tue applicazioni KCL utilizzando la versione 1.x all'ultima versione di KCL prima del 30 gennaio 2026. Per trovare la versione più recente di KCL, consulta la pagina [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) su. GitHub Per informazioni sulle ultime versioni di KCL, consulta. [Usa la libreria client Kinesis](kcl.md) Per informazioni sulla migrazione da KCL 1.x a KCL 3.x, consulta. [Migrazione di KLC da 1.x a 3.x](kcl-migration-1-3.md)

È possibile utilizzare la Kinesis Client Library (KCL) per creare applicazioni che elaborano dati dai tuoi flussi di dati Kinesis. La Kinesis Client Library è disponibile in più linguaggi. In questo argomento viene discusso Node.js.

KCL è una libreria Java; il supporto per linguaggi diversi da Java viene fornito utilizzando un'interfaccia multilingue chiamata. *MultiLangDaemon* Questo daemon è basato su Java e viene eseguito in background quando si utilizza un linguaggio KCL diverso da Java. Pertanto, se installi KCL per Node.js e scrivi la tua app consumer interamente in Node.js, avrai comunque bisogno che Java sia installato sul tuo sistema a causa di. MultiLangDaemon Inoltre, MultiLangDaemon presenta alcune impostazioni predefinite che potresti dover personalizzare in base al tuo caso d'uso, ad esempio la AWS regione a cui si connette. Per ulteriori informazioni su MultiLangDaemon on GitHub, vai alla pagina del [ MultiLangDaemon progetto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Per scaricare il file KCL Node.js da GitHub, vai alla [Kinesis Client Library (Node.js)](https://github.com/awslabs/amazon-kinesis-client-nodejs).

**Download di codice di esempio**

Ci sono due esempi di codice disponibili per KCL in Node.js:
+ [basic-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/basic_sample)

  Viene utilizzato nelle seguenti sezioni per illustrare i concetti fondamentali della costruzione di un'applicazione consumer KCL in Node.js.
+ [click-stream-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/click_stream_sample)

   Leggermente più avanzato e utilizza uno scenario reale. Da utilizzare dopo avere acquisito familiarità con il codice di esempio di base. Questo esempio non è discusso qui, ma dispone di un file README con ulteriori informazioni.

È necessario completare le seguenti attività durante l'implementazione di un'applicazione consumer KCL in Node.js:

**Topics**
+ [Implementa il processore di registrazione](#kinesis-record-processor-implementation-interface-nodejs)
+ [Modifica le proprietà di configurazione](#kinesis-record-processor-initialization-nodejs)

## Implementa il processore di registrazione
<a name="kinesis-record-processor-implementation-interface-nodejs"></a>

Il consumer più semplice possibile che utilizza la KCL per Node.js deve implementare una funzione `recordProcessor`, che a sua volta contiene le funzioni `initialize`, `processRecords` e `shutdown`. L'esempio fornisce un'implementazione che è possibile utilizzare come punto di partenza (consulta `sample_kcl_app.js`).

```
function recordProcessor() {
  // return an object that implements initialize, processRecords and shutdown functions.}
```

**initialize**  
La KCL chiama la funzione `initialize` quando il processore di record si avvia. Questo processore di record elabora esclusivamente l'ID dello shard passato come `initializeInput.shardId` e, in genere, è vero anche il contrario (questo shard è elaborato solo da questo processore di record). Tuttavia, il tuo consumer deve tenere conto della possibilità che un record di dati possa essere elaborato più di una volta. Ciò si verifica perché il flusso di dati Kinesis ha una semantica *almeno una volta*, il che significa che ogni record di dati da una partizione viene elaborato almeno una volta da un worker nel tuo consumer. Per ulteriori informazioni sui casi in cui un determinato shard può essere elaborato da più di un lavoratore, consulta [Usa il resharding, lo scaling e l'elaborazione parallela per modificare il numero di shard](kinesis-record-processor-scaling.md).

```
initialize: function(initializeInput, completeCallback)
```

**processRecords**  
 La KCL chiama questa funzione con input che contiene un elenco di record di dati dalla partizione specificata alla funzione `initialize`. Il processore di record che implementi elabora i dati in questi record in base alla semantica del tuo consumer. Ad esempio, il worker potrebbe eseguire una trasformazione dei dati e, successivamente, archiviare il risultato in un bucket Amazon Simple Storage Service (Amazon S3). 

```
processRecords: function(processRecordsInput, completeCallback)
```

Oltre ai dati stessi, il record contiene anche un numero di sequenza e una chiave di partizione, che il lavoratore può utilizzare durante l'elaborazione dei dati. Ad esempio, il lavoratore può scegliere il bucket S3 in cui archiviare i dati in base al valore della chiave di partizione. Il dizionario `record` espone le seguenti coppie chiave-valore per accedere ai dati del record, al numero di sequenza e alla chiave di partizione:

```
record.data
record.sequenceNumber
record.partitionKey
```

Tieni presente che i dati sono codificati in Base64.

Nell'esempio di base, la funzione `processRecords` ha un codice che mostra in che modo un lavoratore può accedere ai dati del record, al numero di sequenza e alla chiave di partizione.

Il flusso di dati Kinesis richiede che il processore di record tenga traccia dei record che sono già stati elaborati in una partizione. La KCL si occupa di questo monitoraggio per un oggetto `checkpointer` passato come `processRecordsInput.checkpointer`. Il tuo processore di record chiama la funzione `checkpointer.checkpoint` per comunicare alla KCL quanto si è progredito nell'elaborazione dei record nella partizione. In caso di errore del worker, la KCL utilizza queste informazioni quando si riavvia l'elaborazione della partizione in modo tale che l'elaborazione continua dall'ultimo record elaborato conosciuto.

Per le operazioni di divisione o unione, la KCL non avvia l'elaborazione delle nuove partizioni fino a quando i processori delle partizioni originali non avranno chiamato `checkpoint` per segnalare che l'intera elaborazione delle partizioni originali è completa.

Se non viene passato il numero di sequenza alla funzione `checkpoint`, la KCL suppone che la chiamata a `checkpoint` significa che tutti i record sono stati elaborati, fino all'ultimo record passato al processore di record. Pertanto, il processore di record deve chiamare `checkpoint` **solo** dopo aver elaborato tutti i record nell'elenco passato al processore. I processori di record non devono chiamare `checkpoint` in ciascuna chiamata a `processRecords`. Un processore potrebbe, ad esempio, richiamare `checkpoint` ogni tre chiamate o qualche evento esterno al processore di dischi, come un verification/validation servizio personalizzato che hai implementato. 

Puoi specificare, in modo facoltativo, il numero di sequenza esatto di un record come parametro per `checkpoint`. In questo caso, la KCL presuppone che tutti i record siano stati elaborati esclusivamente fino a tale record.

L'applicazione di esempio di base mostra la chiamata più semplice possibile alla funzione `checkpointer.checkpoint`. È possibile aggiungere le altre logiche di creazione di checkpoint di cui hai bisogno per il tuo consumer a questo punto della funzione.

**shutdown**  
La KCL chiama la funzione `shutdown` sia al termine dell'elaborazione (`shutdownInput.reason` è `TERMINATE`) che quando il worker non risponde più (`shutdownInput.reason` è `ZOMBIE`).

```
shutdown: function(shutdownInput, completeCallback)
```

L'elaborazione termina quando il processore di record non riceve ulteriori record dallo shard, perché lo shard è stato frazionato o fuso o perché il flusso è stato eliminato.

La KCL trasferisce inoltre un oggetto `shutdownInput.checkpointer` a `shutdown`. Se il motivo dell'arresto è `TERMINATE`, è necessario assicurarsi che il processore di record abbia terminato l'elaborazione di qualsiasi record di dati e, di seguito, chiamare la funzione `checkpoint` in questa interfaccia.

## Modifica le proprietà di configurazione
<a name="kinesis-record-processor-initialization-nodejs"></a>

L'esempio fornisce valori di default per le proprietà di configurazione. È possibile sostituire una qualsiasi di queste proprietà con i tuoi valori (consulta `sample.properties` nell'esempio di base).

### Application name (Nome applicazione)
<a name="kinesis-record-processor-application-name-nodejs"></a>

La KCL richiede un nome dell'applicazione univoco per tutte le applicazioni e per tutte le tabelle Amazon DynamoDB nella stessa Regione. La biblioteca utilizza il valore di configurazione del nome dell'applicazione nei seguenti modi:
+ Si suppone che tutti i lavoratori associati con questo nome dell'applicazione stiano lavorando insieme nello stesso flusso. Questi lavoratori potrebbero essere distribuiti su più istanze. Se si esegue un'istanza aggiuntiva dello stesso codice dell'applicazione, ma con un nome dell'applicazione diverso, la KCL tratta la seconda istanza come un'applicazione completamente separata che opera anch'essa nello stesso flusso.
+ La KCL crea una tabella DynamoDB con il nome dell'applicazione e la utilizza per mantenere le informazioni sullo stato (ad esempio, checkpoint e mappatura worker-partizione) per l'applicazione. Ogni applicazione ha la propria tabella DynamoDB. Per ulteriori informazioni, consulta [Utilizza una tabella di leasing per tenere traccia degli shard elaborati dall'applicazione consumer KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Configurare le credenziali
<a name="kinesis-record-processor-credentials-nodejs"></a>

È necessario rendere disponibili le AWS credenziali a uno dei provider di credenziali della catena di provider di credenziali predefinita. Puoi utilizzare la proprietà `AWSCredentialsProvider` per impostare un provider di credenziali. Il file `sample.properties` deve rendere le tue credenziali disponibili per uno dei provider di credenziali nella [catena di provider di credenziali di default](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Se esegui il tuo consumer su un'istanza Amazon EC2, ti consigliamo di configurare l'istanza con un ruolo IAM. AWS le credenziali che riflettono le autorizzazioni associate a questo ruolo IAM vengono rese disponibili alle applicazioni sull'istanza tramite i relativi metadati dell'istanza. Questo è il modo più sicuro per gestire le credenziali per un'applicazione di consumo in esecuzione in un'istanza EC2.

Nell'esempio seguente si configura la KCL per elaborare un flusso di dati Kinesis denominato `kclnodejssample` utilizzando il processore di record fornito in `sample_kcl_app.js`:

```
# The Node.js executable script
executableName = node sample_kcl_app.js
# The name of an Amazon Kinesis stream to process
streamName = kclnodejssample
# Unique KCL application name
applicationName = kclnodejssample
# Use default AWS credentials provider chain
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
# Read from the beginning of the stream
initialPositionInStream = TRIM_HORIZON
```

# Sviluppa un utente di Kinesis Client Library in .NET
<a name="kinesis-record-processor-implementation-app-dotnet"></a>

**Importante**  
Le versioni 1.x e 2.x di Amazon KCL sono obsolete. KCL 1.x sarà disponibile il 30 end-of-support gennaio 2026. **Ti consigliamo vivamente di** migrare le tue applicazioni KCL utilizzando la versione 1.x all'ultima versione di KCL prima del 30 gennaio 2026. Per trovare la versione più recente di KCL, consulta la pagina [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) su. GitHub Per informazioni sulle ultime versioni di KCL, consulta. [Usa la libreria client Kinesis](kcl.md) Per informazioni sulla migrazione da KCL 1.x a KCL 3.x, consulta. [Migrazione di KLC da 1.x a 3.x](kcl-migration-1-3.md)

È possibile utilizzare la Kinesis Client Library (KCL) per creare applicazioni che elaborano dati dai tuoi flussi di dati Kinesis. La Kinesis Client Library è disponibile in più linguaggi. In questo argomento viene discusso .NET.

KCL è una libreria Java; il supporto per linguaggi diversi da Java viene fornito utilizzando un'interfaccia multilingue chiamata. *MultiLangDaemon* Questo daemon è basato su Java e viene eseguito in background quando si utilizza un linguaggio KCL diverso da Java. Pertanto, se installi KCL per .NET e scrivi la tua app consumer interamente in .NET, avrai comunque bisogno che Java sia installato sul tuo sistema a causa di. MultiLangDaemon Inoltre, MultiLangDaemon presenta alcune impostazioni predefinite che potresti dover personalizzare in base al tuo caso d'uso, ad esempio la AWS regione a cui si connette. Per ulteriori informazioni su MultiLangDaemon on GitHub, vai alla pagina del [ MultiLangDaemon progetto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Per scaricare il.NET KCL da GitHub, vai a [Kinesis Client Library (](https://github.com/awslabs/amazon-kinesis-client-net).NET). Per scaricare il codice di esempio per un'applicazione consumer di.NET KCL, vai alla pagina del progetto [KCL for .NET sample consumer](https://github.com/awslabs/amazon-kinesis-client-net/tree/master/SampleConsumer) su. GitHub

È necessario completare le seguenti attività durante l'implementazione di un'applicazione consumer KCL in .NET:

**Topics**
+ [Implementa i metodi della classe IRecord Processor](#kinesis-record-processor-implementation-interface-dotnet)
+ [Modificare le proprietà di configurazione](#kinesis-record-processor-initialization-dotnet)

## Implementa i metodi della classe IRecord Processor
<a name="kinesis-record-processor-implementation-interface-dotnet"></a>

Il consumer deve implementare i seguenti metodi per `IRecordProcessor`. Il consumer di esempio fornisce implementazioni che è possibile utilizzare come punto di partenza (consulta la classe `SampleRecordProcessor` in `SampleConsumer/AmazonKinesisSampleConsumer.cs`).

```
public void Initialize(InitializationInput input)
public void ProcessRecords(ProcessRecordsInput input)
public void Shutdown(ShutdownInput input)
```

**Inizializzazione**  
La KCL chiama questo metodo quando viene creata un'istanza del processore di record, passando un ID della partizione specifico nel parametro `input` (`input.ShardId`). Questo processore di record elabora esclusivamente questo shard e, in genere, è vero anche il contrario (questo shard è elaborato solo da questo processore di record). Tuttavia, il tuo consumer deve tenere conto della possibilità che un record di dati possa essere elaborato più di una volta. Ciò si verifica perché il flusso di dati Kinesis ha una semantica *almeno una volta*, il che significa che ogni record di dati da una partizione viene elaborato almeno una volta da un worker nel tuo consumer. Per ulteriori informazioni sui casi in cui un determinato shard può essere elaborato da più di un lavoratore, consulta [Usa il resharding, lo scaling e l'elaborazione parallela per modificare il numero di shard](kinesis-record-processor-scaling.md).

```
public void Initialize(InitializationInput input)
```

**ProcessRecords**  
La KCL chiama questo metodo e passa una lista di record di dati nel parametro `input` (`input.Records`) dalla partizione specificata dal metodo `Initialize`. Il processore di record che implementi elabora i dati in questi record in base alla semantica del tuo consumer. Ad esempio, il worker potrebbe eseguire una trasformazione dei dati e, successivamente, archiviare il risultato in un bucket Amazon Simple Storage Service (Amazon S3).

```
public void ProcessRecords(ProcessRecordsInput input)
```

Oltre ai dati stessi, il record contiene anche un numero di sequenza e una chiave di partizione. Il lavoratore può utilizzare questi valori quando elabora i dati. Ad esempio, il lavoratore può scegliere il bucket S3 in cui archiviare i dati in base al valore della chiave di partizione. La classe `Record` espone quanto segue per accedere ai dati del record, al numero di sequenza e alla chiave di partizione:

```
byte[] Record.Data 
string Record.SequenceNumber
string Record.PartitionKey
```

Nell'esempio, il metodo `ProcessRecordsWithRetries` ha un codice che mostra in che modo un lavoratore può accedere ai dati del record, al numero di sequenza e alla chiave di partizione.

Il flusso di dati Kinesis richiede che il processore di record tenga traccia dei record che sono già stati elaborati in una partizione. La KCL si occupa di questo monitoraggio per te, passando un oggetto `Checkpointer` a `ProcessRecords` (`input.Checkpointer`). Il processore di record chiama il metodo`Checkpointer.Checkpoint` per comunicare alla KCL quanto si è progredito nell'elaborazione dei record nella partizione. In caso di errore del worker, la KCL utilizza queste informazioni per riavviare l'elaborazione della partizione nell'ultimo record elaborato conosciuto.

Per le operazioni di divisione o unione, la KCL non avvia l'elaborazione delle nuove partizioni fino a quando i processori delle partizioni originali non avranno chiamato `Checkpointer.Checkpoint` per segnalare che l'intera elaborazione delle partizioni originali è completa.

Se non viene passato un parametro, la KCL suppone che la chiamata a `Checkpointer.Checkpoint` significa che tutti i record sono stati elaborati, fino all'ultimo record passato al processore di record. Pertanto, il processore di record deve chiamare `Checkpointer.Checkpoint` solo dopo aver elaborato tutti i record nell'elenco passato al processore. I processori di record non devono chiamare `Checkpointer.Checkpoint` in ciascuna chiamata a `ProcessRecords`. Un processore potrebbe, per esempio, chiamare `Checkpointer.Checkpoint` in ogni terza o quarta chiamata. Puoi specificare, in modo facoltativo, il numero di sequenza esatto di un record come parametro per `Checkpointer.Checkpoint`. In questo caso, la KCL presuppone che tutti i record siano stati elaborati esclusivamente fino a tale record.

Nell'esempio, il metodo privato `Checkpoint(Checkpointer checkpointer)` mostra come effettuare la chiamata al metodo `Checkpointer.Checkpoint` utilizzando la gestione delle eccezioni e la logica dei nuovi tentativi appropriate.

La KCL per .NET gestisce le eccezioni in modo diverso rispetto alle altre biblioteche di linguaggi KCL, in quanto non gestisce eventuali eccezioni generate dall'elaborazione dei record di dati. Le eccezioni non rilevate dal codice dell'utente determinano l'arresto del programma.

**Arresto**  
La KCL chiama il metodo `Shutdown` sia al termine dell'elaborazione (il motivo dell'arresto è `TERMINATE`) che quando il worker non risponde più (il valore di `input.Reason` dell'arresto è `ZOMBIE`).

```
public void Shutdown(ShutdownInput input)
```

L'elaborazione termina quando il processore di record non riceve ulteriori record dallo shard, perché lo shard è stato frazionato o fuso o perché il flusso è stato eliminato.

La KCL trasferisce inoltre un oggetto `Checkpointer` a `shutdown`. Se il motivo dell'arresto è `TERMINATE`, il processore di record deve terminare l'elaborazione di qualsiasi record di dati e, di seguito, chiamare il metodo `checkpoint` in questa interfaccia.

## Modificare le proprietà di configurazione
<a name="kinesis-record-processor-initialization-dotnet"></a>

Il consumer di esempio fornisce valori di default per le proprietà di configurazione. È possibile sostituire una qualsiasi di queste proprietà con i tuoi valori (consulta `SampleConsumer/kcl.properties`).

### Application name (Nome applicazione)
<a name="modify-kinesis-record-processor-application-name"></a>

La KCL richiede un nome dell'applicazione univoco per tutte le applicazioni e per tutte le tabelle Amazon DynamoDB nella stessa Regione. La biblioteca utilizza il valore di configurazione del nome dell'applicazione nei seguenti modi:
+ Si suppone che tutti i lavoratori associati con questo nome dell'applicazione stiano lavorando insieme nello stesso flusso. Questi lavoratori potrebbero essere distribuiti su più istanze. Se si esegue un'istanza aggiuntiva dello stesso codice dell'applicazione, ma con un nome dell'applicazione diverso, la KCL tratta la seconda istanza come un'applicazione completamente separata che opera anch'essa nello stesso flusso.
+ La KCL crea una tabella DynamoDB con il nome dell'applicazione e la utilizza per mantenere le informazioni sullo stato (ad esempio, checkpoint e mappatura worker-partizione) per l'applicazione. Ogni applicazione ha la propria tabella DynamoDB. Per ulteriori informazioni, consulta [Utilizza una tabella di leasing per tenere traccia degli shard elaborati dall'applicazione consumer KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Configurare le credenziali
<a name="kinesis-record-processor-creds-dotnet"></a>

È necessario rendere disponibili le AWS credenziali a uno dei provider di credenziali della catena di provider di credenziali predefinita. Puoi utilizzare la proprietà `AWSCredentialsProvider` per impostare un provider di credenziali. Le [proprietà di esempio](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) devono rendere le tue credenziali disponibili per uno dei provider di credenziali nella [catena di provider di credenziali di default](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Se l'applicazione consumer è in esecuzione su un'istanza EC2, consigliamo di configurare l'istanza con un ruolo IAM. Le credenziali AWS che riflettono le autorizzazioni associate a questo ruolo IAM vengono rese disponibili alle applicazioni sull'istanza tramite i relativi metadati dell'istanza. Questo è il modo più sicuro per gestire le credenziali per un consumer in esecuzione in un'istanza EC2.

Il file di proprietà di esempio configura la KCL per elaborare un flusso di dati Kinesis denominato "words" utilizzando il processore di record fornito in `AmazonKinesisSampleConsumer.cs`. 

# Sviluppa un utente della Kinesis Client Library in Python
<a name="kinesis-record-processor-implementation-app-py"></a>

**Importante**  
Le versioni 1.x e 2.x di Amazon KCL sono obsolete. KCL 1.x arriverà il 30 end-of-support gennaio 2026. **Ti consigliamo vivamente di** migrare le tue applicazioni KCL utilizzando la versione 1.x all'ultima versione di KCL prima del 30 gennaio 2026. Per trovare la versione più recente di KCL, consulta la pagina [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) su. GitHub Per informazioni sulle ultime versioni di KCL, consulta. [Usa la libreria client Kinesis](kcl.md) Per informazioni sulla migrazione da KCL 1.x a KCL 3.x, consulta. [Migrazione di KLC da 1.x a 3.x](kcl-migration-1-3.md)

È possibile utilizzare la Kinesis Client Library (KCL) per creare applicazioni che elaborano dati dai tuoi flussi di dati Kinesis. La Kinesis Client Library è disponibile in più linguaggi. In questo argomento viene discusso Python.

KCL è una libreria Java; il supporto per linguaggi diversi da Java viene fornito utilizzando un'interfaccia multilingue chiamata. *MultiLangDaemon* Questo daemon è basato su Java e viene eseguito in background quando si utilizza un linguaggio KCL diverso da Java. Pertanto, se installi KCL per Python e scrivi la tua app consumer interamente in Python, avrai comunque bisogno che Java sia installato sul tuo sistema a causa di. MultiLangDaemon Inoltre, MultiLangDaemon ha alcune impostazioni predefinite che potresti dover personalizzare in base al tuo caso d'uso, ad esempio la AWS regione a cui si connette. Per ulteriori informazioni su MultiLangDaemon on GitHub, vai alla pagina del [ MultiLangDaemon progetto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Per scaricare Python KCL da GitHub, vai alla [Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client-python) (Python). Per scaricare il codice di esempio per un'applicazione consumer Python KCL, vai alla pagina del progetto di esempio [KCL for Python](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples) su. GitHub

È necessario completare le seguenti attività durante l'implementazione di un'applicazione consumer KCL in Python:

**Topics**
+ [Implementa i metodi della classe RecordProcessor](#kinesis-record-processor-implementation-interface-py)
+ [Modificare le proprietà di configurazione](#kinesis-record-processor-initialization-py)

## Implementa i metodi della classe RecordProcessor
<a name="kinesis-record-processor-implementation-interface-py"></a>

La classe `RecordProcess` deve estendere la `RecordProcessorBase` per implementare i seguenti metodi. L'esempio fornisce implementazioni che è possibile utilizzare come punto di partenza (consulta `sample_kclpy_app.py`).

```
def initialize(self, shard_id)
def process_records(self, records, checkpointer)
def shutdown(self, checkpointer, reason)
```

**initialize**  
La KCL chiama il metodo `initialize` quando viene creata un'istanza del processore di record, passando un ID della partizione specifico come parametro. Questo processore di record elabora esclusivamente questo shard e, in genere, è vero anche il contrario (questo shard è elaborato solo da questo processore di record). Tuttavia, il tuo consumer deve tenere conto della possibilità che un record di dati possa essere elaborato più di una volta. Ciò si verifica perché il flusso di dati Kinesis ha una semantica *almeno una volta*, il che significa che ogni record di dati da una partizione viene elaborato almeno una volta da un worker nel tuo consumer. Per ulteriori informazioni sui casi in cui un determinato shard può essere elaborato da più di un lavoratore, consulta [Usa il resharding, lo scaling e l'elaborazione parallela per modificare il numero di shard](kinesis-record-processor-scaling.md).

```
def initialize(self, shard_id)
```

**process\$1records**  
 La KCL chiama questo metodo e passa un elenco di record di dati dalla partizione specificata dal metodo `initialize`. Il processore di record che implementi elabora i dati in questi record in base alla semantica del tuo consumer. Ad esempio, il worker potrebbe eseguire una trasformazione dei dati e, successivamente, archiviare il risultato in un bucket Amazon Simple Storage Service (Amazon S3).

```
def process_records(self, records, checkpointer) 
```

Oltre ai dati stessi, il record contiene anche un numero di sequenza e una chiave di partizione. Il lavoratore può utilizzare questi valori quando elabora i dati. Ad esempio, il lavoratore può scegliere il bucket S3 in cui archiviare i dati in base al valore della chiave di partizione. Il dizionario `record` espone le seguenti coppie chiave-valore per accedere ai dati del record, al numero di sequenza e alla chiave di partizione:

```
record.get('data')
record.get('sequenceNumber')
record.get('partitionKey')
```

Tieni presente che i dati sono codificati in Base64.

Nell'esempio, il metodo `process_records` ha un codice che mostra in che modo un lavoratore può accedere ai dati del record, al numero di sequenza e alla chiave di partizione.

Il flusso di dati Kinesis richiede che il processore di record tenga traccia dei record che sono già stati elaborati in una partizione. La KCL si occupa di questo monitoraggio per te, passando un oggetto `Checkpointer` a `process_records`. Il processore di record chiama il metodo `checkpoint` in questo oggetto per comunicare alla KCL quanto si è progredito nell'elaborazione dei record nella partizione. In caso di errore del worker, la KCL utilizza queste informazioni per riavviare l'elaborazione della partizione nell'ultimo record elaborato conosciuto.

Per le operazioni di divisione o unione, la KCL non avvia l'elaborazione delle nuove partizioni fino a quando i processori delle partizioni originali non avranno chiamato `checkpoint` per segnalare che l'intera elaborazione delle partizioni originali è completa.

Se non viene passato un parametro, la KCL suppone che la chiamata a `checkpoint` significa che tutti i record sono stati elaborati, fino all'ultimo record passato al processore di record. Pertanto, il processore di record deve chiamare `checkpoint` solo dopo aver elaborato tutti i record nell'elenco passato al processore. I processori di record non devono chiamare `checkpoint` in ciascuna chiamata a `process_records`. Un processore potrebbe, per esempio, chiamare `checkpoint` in ogni terza chiamata. Puoi specificare, in modo facoltativo, il numero di sequenza esatto di un record come parametro per `checkpoint`. In questo caso, la KCL presuppone che tutti i record siano stati elaborati esclusivamente fino a tale record.

Nell'esempio, il metodo privato `checkpoint` mostra come effettuare la chiamata al metodo `Checkpointer.checkpoint` utilizzando la gestione delle eccezioni e la logica dei nuovi tentativi appropriate.

La KCL si basa su `process_records` per gestire eventuali eccezioni generate dall'elaborazione dei record di dati. Se viene generata un'eccezione da `process_records`, la KCL omette i record di dati passati a `process_records` prima dell'eccezione. Ciò significa che questi record non sono inviati nuovamente al processore di record che ha generato l'eccezione o a qualsiasi altro processore di record nel consumer.

**shutdown**  
 La KCL chiama il metodo `shutdown` sia al termine dell'elaborazione (il motivo dell'arresto è `TERMINATE`) che quando il worker non risponde più (il `reason` l'arresto è `ZOMBIE`).

```
def shutdown(self, checkpointer, reason)
```

L'elaborazione termina quando il processore di record non riceve ulteriori record dallo shard, perché lo shard è stato frazionato o fuso o perché il flusso è stato eliminato.

 La KCL trasferisce inoltre un oggetto `Checkpointer` a `shutdown`. Se il `reason` dell'arresto è `TERMINATE`, il processore di record deve terminare l'elaborazione di qualsiasi record di dati e, di seguito, chiamare il metodo `checkpoint` in questa interfaccia.

## Modificare le proprietà di configurazione
<a name="kinesis-record-processor-initialization-py"></a>

L'esempio fornisce valori di default per le proprietà di configurazione. È possibile sostituire una qualsiasi di queste proprietà con i tuoi valori (consulta `sample.properties`).

### Application name (Nome applicazione)
<a name="kinesis-record-processor-application-name-py"></a>

La KCL richiede un nome dell'applicazione univoco per tutte le tue applicazioni e per tutte le tabelle Amazon DynamoDB nella stessa Regione. La biblioteca utilizza il valore di configurazione del nome dell'applicazione nei seguenti modi:
+ Si suppone che tutti i lavoratori associati con questo nome dell'applicazione stiano lavorando insieme nello stesso flusso. Questi lavoratori possono essere distribuiti su più istanze. Se si esegue un'istanza aggiuntiva dello stesso codice dell'applicazione, ma con un nome dell'applicazione diverso, la KCL tratta la seconda istanza come un'applicazione completamente separata che opera anch'essa nello stesso flusso.
+ La KCL crea una tabella DynamoDB con il nome dell'applicazione e la utilizza per mantenere le informazioni sullo stato (ad esempio, checkpoint e mappatura worker-partizione) per l'applicazione. Ogni applicazione ha la propria tabella DynamoDB. Per ulteriori informazioni, consulta [Utilizza una tabella di leasing per tenere traccia degli shard elaborati dall'applicazione consumer KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Configurare le credenziali
<a name="kinesis-record-processor-creds-py"></a>

È necessario rendere disponibili le AWS credenziali a uno dei provider di credenziali della catena di provider di credenziali predefinita. Puoi utilizzare la proprietà `AWSCredentialsProvider` per impostare un provider di credenziali. Le [proprietà di esempio](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) devono rendere le tue credenziali disponibili per uno dei provider di credenziali nella [catena di provider di credenziali di default](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Se l'applicazione consumer è in esecuzione su un'istanza Amazon EC2, è consigliabile configurare l'istanza con un ruolo IAM. Le credenziali AWS che riflettono le autorizzazioni associate a questo ruolo IAM vengono rese disponibili alle applicazioni sull'istanza tramite i relativi metadati dell'istanza. Questo è il modo più sicuro per gestire le credenziali per un'applicazione di consumo in esecuzione in un'istanza EC2.

Il file di proprietà di esempio configura la KCL per elaborare un flusso di dati Kinesis denominato "words" utilizzando il processore di record fornito in `sample_kclpy_app.py`. 

# Sviluppa un utente di Kinesis Client Library in Ruby
<a name="kinesis-record-processor-implementation-app-ruby"></a>

**Importante**  
Le versioni 1.x e 2.x di Amazon KCL sono obsolete. KCL 1.x arriverà il 30 end-of-support gennaio 2026. **Ti consigliamo vivamente di** migrare le tue applicazioni KCL utilizzando la versione 1.x all'ultima versione di KCL prima del 30 gennaio 2026. Per trovare la versione più recente di KCL, consulta la pagina [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) su. GitHub Per informazioni sulle ultime versioni di KCL, consulta. [Usa la libreria client Kinesis](kcl.md) Per informazioni sulla migrazione da KCL 1.x a KCL 3.x, consulta. [Migrazione di KLC da 1.x a 3.x](kcl-migration-1-3.md)

È possibile utilizzare la Kinesis Client Library (KCL) per creare applicazioni che elaborano dati dai tuoi flussi di dati Kinesis. La Kinesis Client Library è disponibile in più linguaggi. In questo argomento viene discusso Ruby.

KCL è una libreria Java; il supporto per linguaggi diversi da Java viene fornito utilizzando un'interfaccia multilingue chiamata. *MultiLangDaemon* Questo daemon è basato su Java e viene eseguito in background quando si utilizza un linguaggio KCL diverso da Java. Perciò, se installi KCL per Ruby e scrivi la tua app consumer interamente in Ruby, avrai comunque bisogno che Java sia installato sul tuo sistema a causa di. MultiLangDaemon Inoltre, MultiLangDaemon ha alcune impostazioni predefinite che potresti dover personalizzare in base al tuo caso d'uso, ad esempio la AWS regione a cui si connette. Per ulteriori informazioni su MultiLangDaemon on GitHub, vai alla pagina del [ MultiLangDaemon progetto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Per scaricare Ruby KCL da GitHub, vai alla [Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client-ruby) (Ruby). Per scaricare il codice di esempio per un'applicazione consumer Ruby KCL, vai alla pagina del progetto di esempio [KCL](https://github.com/awslabs/amazon-kinesis-client-ruby/tree/master/samples) for Ruby su. GitHub

Per ulteriori informazioni sulla biblioteca di supporto Ruby di KCL, consulta [Documentazione di Ruby Gems KCL](http://www.rubydoc.info/gems/aws-kclrb).

# Sviluppa KCL 2.x Consumers
<a name="developing-consumers-with-kcl-v2"></a>

**Importante**  
Le versioni 1.x e 2.x di Amazon KCL sono obsolete. KCL 1.x arriverà il 30 end-of-support gennaio 2026. **Ti consigliamo vivamente di** migrare le tue applicazioni KCL utilizzando la versione 1.x all'ultima versione di KCL prima del 30 gennaio 2026. Per trovare la versione più recente di KCL, consulta la pagina [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) su. GitHub Per informazioni sulle ultime versioni di KCL, consulta. [Usa la libreria client Kinesis](kcl.md) Per informazioni sulla migrazione da KCL 1.x a KCL 3.x, consulta. [Migrazione di KLC da 1.x a 3.x](kcl-migration-1-3.md)

Questo argomento illustra come utilizzare la versione 2.0 di Kinesis Client Library (KCL). 

Per ulteriori informazioni sulla KCL, consulta la panoramica in [Sviluppo di app consumer utilizzando Kinesis Client Library 1.x](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html).

Scegli tra i seguenti argomenti a seconda dell'opzione che desideri utilizzare.

**Topics**
+ [Sviluppa un utente di Kinesis Client Library in Java](kcl2-standard-consumer-java-example.md)
+ [Sviluppa un utente della Kinesis Client Library in Python](kcl2-standard-consumer-python-example.md)
+ [Sviluppa utenti avanzati con fan-out con KCL 2.x](building-enhanced-consumers-kcl-retired.md)

# Sviluppa un utente di Kinesis Client Library in Java
<a name="kcl2-standard-consumer-java-example"></a>

**Importante**  
Le versioni 1.x e 2.x di Amazon KCL sono obsolete. KCL 1.x arriverà il 30 end-of-support gennaio 2026. **Ti consigliamo vivamente di** migrare le tue applicazioni KCL utilizzando la versione 1.x all'ultima versione di KCL prima del 30 gennaio 2026. Per trovare la versione più recente di KCL, consulta la pagina [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) su. GitHub Per informazioni sulle ultime versioni di KCL, consulta. [Usa la libreria client Kinesis](kcl.md) Per informazioni sulla migrazione da KCL 1.x a KCL 3.x, consulta. [Migrazione di KLC da 1.x a 3.x](kcl-migration-1-3.md)

Il seguente codice mostra un esempio di implementazione in Java di `ProcessorFactory` e `RecordProcessor`. Se vuoi sfruttare la funzionalità fan-out avanzato, consulta [Utilizzo di app Consumer con il fan-out avanzato](https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-kcl-java.html).

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License.
 */


/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

/**
 * This class will run a simple app that uses the KCL to read data and uses the AWS SDK to publish data.
 * Before running this program you must first create a Kinesis stream through the AWS console or AWS SDK.
 */
public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    /**
     * Invoke the main method with 2 args: the stream name and (optionally) the region.
     * Verifies valid inputs and then starts running the app.
     */
    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    /**
     * Constructor sets streamName and region. It also creates a KinesisClient object to send data to Kinesis.
     * This KinesisClient is used to send dummy data so that the consumer has something to read; it is also used
     * indirectly by the KCL to handle the consumption of the data.
     */
    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {

        /**
         * Sends dummy data to Kinesis. Not relevant to consuming the data with the KCL
         */
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        /**
         * Sets up configuration for the KCL, including DynamoDB and CloudWatch dependencies. The final argument, a
         * ShardRecordProcessorFactory, is where the logic for record processing lives, and is located in a private
         * class below.
         */
        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        /**
         * The Scheduler (also called Worker in earlier versions of the KCL) is the entry point to the KCL. This
         * instance is configured with defaults provided by the ConfigsBuilder.
         */
        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );

        /**
         * Kickoff the Scheduler. Record processing of the stream of dummy data will continue indefinitely
         * until an exit is triggered.
         */
        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        /**
         * Allows termination of app by pressing Enter.
         */
        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        /**
         * Stops sending dummy data.
         */
        log.info("Cancelling producer and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        /**
         * Stops consuming data. Finishes processing the current batch of data already received from Kinesis
         * before shutting down.
         */
        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown.  Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    /**
     * Sends a single record of dummy data to Kinesis.
     */
    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }

    /**
     * The implementation of the ShardRecordProcessor interface is where the heart of the record processing logic lives.
     * In this example all we do to 'process' is log info about the records.
     */
    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        /**
         * Invoked by the KCL before data records are delivered to the ShardRecordProcessor instance (via
         * processRecords). In this example we do nothing except some logging.
         *
         * @param initializationInput Provides information related to initialization.
         */
        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Handles record processing logic. The Amazon Kinesis Client Library will invoke this method to deliver
         * data records to the application. In this example we simply log our records.
         *
         * @param processRecordsInput Provides the records to be processed as well as information and capabilities
         *                            related to them (e.g. checkpointing).
         */
        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /** Called when the lease tied to this record processor has been lost. Once the lease has been lost,
         * the record processor can no longer checkpoint.
         *
         * @param leaseLostInput Provides access to functions and data related to the loss of the lease.
         */
        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Called when all data on this shard has been processed. Checkpointing must occur in the method for record
         * processing to be considered complete; an exception will be thrown otherwise.
         *
         * @param shardEndedInput Provides access to a checkpointer method for completing processing of the shard.
         */
        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Invoked when Scheduler has been requested to shut down (i.e. we decide to stop running the app by pressing
         * Enter). Checkpoints and logs the data a final time.
         *
         * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint
         *                               before the shutdown is completed.
         */
        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```

# Sviluppa un utente della Kinesis Client Library in Python
<a name="kcl2-standard-consumer-python-example"></a>

**Importante**  
Le versioni 1.x e 2.x di Amazon KCL sono obsolete. KCL 1.x arriverà il 30 end-of-support gennaio 2026. **Ti consigliamo vivamente di** migrare le tue applicazioni KCL utilizzando la versione 1.x all'ultima versione di KCL prima del 30 gennaio 2026. Per trovare la versione più recente di KCL, consulta la pagina [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) su. GitHub Per informazioni sulle ultime versioni di KCL, consulta. [Usa la libreria client Kinesis](kcl.md) Per informazioni sulla migrazione da KCL 1.x a KCL 3.x, consulta. [Migrazione di KLC da 1.x a 3.x](kcl-migration-1-3.md)

È possibile utilizzare la Kinesis Client Library (KCL) per creare applicazioni che elaborano dati dai tuoi flussi di dati Kinesis. La Kinesis Client Library è disponibile in più linguaggi. In questo argomento viene discusso Python.

KCL è una libreria Java; il supporto per linguaggi diversi da Java viene fornito utilizzando un'interfaccia multilingue chiamata. *MultiLangDaemon* Questo daemon è basato su Java e viene eseguito in background quando si utilizza un linguaggio KCL diverso da Java. Pertanto, se installi KCL per Python e scrivi la tua app consumer interamente in Python, avrai comunque bisogno che Java sia installato sul tuo sistema a causa di. MultiLangDaemon Inoltre, MultiLangDaemon ha alcune impostazioni predefinite che potresti dover personalizzare in base al tuo caso d'uso, ad esempio la AWS regione a cui si connette. Per ulteriori informazioni su MultiLangDaemon on GitHub, vai alla pagina del [ MultiLangDaemon progetto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Per scaricare Python KCL da GitHub, vai alla [Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client-python) (Python). Per scaricare il codice di esempio per un'applicazione consumer Python KCL, vai alla pagina del progetto di esempio [KCL for Python](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples) su. GitHub

È necessario completare le seguenti attività durante l'implementazione di un'applicazione consumer KCL in Python:

**Topics**
+ [Implementa i metodi della classe RecordProcessor](#kinesis-record-processor-implementation-interface-py)
+ [Modificare le proprietà di configurazione](#kinesis-record-processor-initialization-py)

## Implementa i metodi della classe RecordProcessor
<a name="kinesis-record-processor-implementation-interface-py"></a>

La classe `RecordProcess` deve estendere la classe `RecordProcessorBase` per implementare i seguenti metodi:

```
initialize
process_records
shutdown_requested
```

L'esempio fornisce implementazioni che è possibile utilizzare come punto di partenza.

```
#!/usr/bin/env python

# Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Amazon Software License (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://aws.amazon.com/asl/
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

from __future__ import print_function

import sys
import time

from amazon_kclpy import kcl
from amazon_kclpy.v3 import processor


class RecordProcessor(processor.RecordProcessorBase):
    """
    A RecordProcessor processes data from a shard in a stream. Its methods will be called with this pattern:

    * initialize will be called once
    * process_records will be called zero or more times
    * shutdown will be called if this MultiLangDaemon instance loses the lease to this shard, or the shard ends due
        a scaling change.
    """
    def __init__(self):
        self._SLEEP_SECONDS = 5
        self._CHECKPOINT_RETRIES = 5
        self._CHECKPOINT_FREQ_SECONDS = 60
        self._largest_seq = (None, None)
        self._largest_sub_seq = None
        self._last_checkpoint_time = None

    def log(self, message):
        sys.stderr.write(message)

    def initialize(self, initialize_input):
        """
        Called once by a KCLProcess before any calls to process_records

        :param amazon_kclpy.messages.InitializeInput initialize_input: Information about the lease that this record
            processor has been assigned.
        """
        self._largest_seq = (None, None)
        self._last_checkpoint_time = time.time()

    def checkpoint(self, checkpointer, sequence_number=None, sub_sequence_number=None):
        """
        Checkpoints with retries on retryable exceptions.

        :param amazon_kclpy.kcl.Checkpointer checkpointer: the checkpointer provided to either process_records
            or shutdown
        :param str or None sequence_number: the sequence number to checkpoint at.
        :param int or None sub_sequence_number: the sub sequence number to checkpoint at.
        """
        for n in range(0, self._CHECKPOINT_RETRIES):
            try:
                checkpointer.checkpoint(sequence_number, sub_sequence_number)
                return
            except kcl.CheckpointError as e:
                if 'ShutdownException' == e.value:
                    #
                    # A ShutdownException indicates that this record processor should be shutdown. This is due to
                    # some failover event, e.g. another MultiLangDaemon has taken the lease for this shard.
                    #
                    print('Encountered shutdown exception, skipping checkpoint')
                    return
                elif 'ThrottlingException' == e.value:
                    #
                    # A ThrottlingException indicates that one of our dependencies is is over burdened, e.g. too many
                    # dynamo writes. We will sleep temporarily to let it recover.
                    #
                    if self._CHECKPOINT_RETRIES - 1 == n:
                        sys.stderr.write('Failed to checkpoint after {n} attempts, giving up.\n'.format(n=n))
                        return
                    else:
                        print('Was throttled while checkpointing, will attempt again in {s} seconds'
                              .format(s=self._SLEEP_SECONDS))
                elif 'InvalidStateException' == e.value:
                    sys.stderr.write('MultiLangDaemon reported an invalid state while checkpointing.\n')
                else:  # Some other error
                    sys.stderr.write('Encountered an error while checkpointing, error was {e}.\n'.format(e=e))
            time.sleep(self._SLEEP_SECONDS)

    def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
        """
        Called for each record that is passed to process_records.

        :param str data: The blob of data that was contained in the record.
        :param str partition_key: The key associated with this recod.
        :param int sequence_number: The sequence number associated with this record.
        :param int sub_sequence_number: the sub sequence number associated with this record.
        """
        ####################################
        # Insert your processing logic here
        ####################################
        self.log("Record (Partition Key: {pk}, Sequence Number: {seq}, Subsequence Number: {sseq}, Data Size: {ds}"
                 .format(pk=partition_key, seq=sequence_number, sseq=sub_sequence_number, ds=len(data)))

    def should_update_sequence(self, sequence_number, sub_sequence_number):
        """
        Determines whether a new larger sequence number is available

        :param int sequence_number: the sequence number from the current record
        :param int sub_sequence_number: the sub sequence number from the current record
        :return boolean: true if the largest sequence should be updated, false otherwise
        """
        return self._largest_seq == (None, None) or sequence_number > self._largest_seq[0] or \
            (sequence_number == self._largest_seq[0] and sub_sequence_number > self._largest_seq[1])

    def process_records(self, process_records_input):
        """
        Called by a KCLProcess with a list of records to be processed and a checkpointer which accepts sequence numbers
        from the records to indicate where in the stream to checkpoint.

        :param amazon_kclpy.messages.ProcessRecordsInput process_records_input: the records, and metadata about the
            records.
        """
        try:
            for record in process_records_input.records:
                data = record.binary_data
                seq = int(record.sequence_number)
                sub_seq = record.sub_sequence_number
                key = record.partition_key
                self.process_record(data, key, seq, sub_seq)
                if self.should_update_sequence(seq, sub_seq):
                    self._largest_seq = (seq, sub_seq)

            #
            # Checkpoints every self._CHECKPOINT_FREQ_SECONDS seconds
            #
            if time.time() - self._last_checkpoint_time > self._CHECKPOINT_FREQ_SECONDS:
                self.checkpoint(process_records_input.checkpointer, str(self._largest_seq[0]), self._largest_seq[1])
                self._last_checkpoint_time = time.time()

        except Exception as e:
            self.log("Encountered an exception while processing records. Exception was {e}\n".format(e=e))

    def lease_lost(self, lease_lost_input):
        self.log("Lease has been lost")

    def shard_ended(self, shard_ended_input):
        self.log("Shard has ended checkpointing")
        shard_ended_input.checkpointer.checkpoint()

    def shutdown_requested(self, shutdown_requested_input):
        self.log("Shutdown has been requested, checkpointing.")
        shutdown_requested_input.checkpointer.checkpoint()


if __name__ == "__main__":
    kcl_process = kcl.KCLProcess(RecordProcessor())
    kcl_process.run()
```

## Modificare le proprietà di configurazione
<a name="kinesis-record-processor-initialization-py"></a>

L'esempio fornisce valori di default per le proprietà di configurazione, come mostra lo script seguente. È possibile sostituire una qualsiasi di queste proprietà con i propri valori.

```
# The script that abides by the multi-language protocol. This script will
# be executed by the MultiLangDaemon, which will communicate with this script
# over STDIN and STDOUT according to the multi-language protocol.
executableName = sample_kclpy_app.py

# The name of an Amazon Kinesis stream to process.
streamName = words

# Used by the KCL as the name of this application. Will be used as the name
# of an Amazon DynamoDB table which will store the lease and checkpoint
# information for workers with this application name
applicationName = PythonKCLSample

# Users can change the credentials provider the KCL will use to retrieve credentials.
# The DefaultAWSCredentialsProviderChain checks several other providers, which is
# described here:
# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain

# Appended to the user agent of the KCL. Does not impact the functionality of the
# KCL in any other way.
processingLanguage = python/2.7

# Valid options at TRIM_HORIZON or LATEST.
# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax
initialPositionInStream = TRIM_HORIZON

# The following properties are also available for configuring the KCL Worker that is created
# by the MultiLangDaemon.

# The KCL defaults to us-east-1
#regionName = us-east-1

# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
# will be regarded as having problems and it's shards will be assigned to other workers.
# For applications that have a large number of shards, this msy be set to a higher number to reduce
# the number of DynamoDB IOPS required for tracking leases
#failoverTimeMillis = 10000

# A worker id that uniquely identifies this worker among all workers using the same applicationName
# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself.
#workerId = 

# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
#shardSyncIntervalMillis = 60000

# Max records to fetch from Kinesis in a single GetRecords call.
#maxRecords = 10000

# Idle time between record reads in milliseconds.
#idleTimeBetweenReadsInMillis = 1000

# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while)
#callProcessRecordsEvenForEmptyRecordList = false

# Interval in milliseconds between polling to check for parent shard completion.
# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
# completion of parent shards).
#parentShardPollIntervalMillis = 10000

# Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try
# to delete the ones we don't need any longer.
#cleanupLeasesUponShardCompletion = true

# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
#taskBackoffTimeMillis = 500

# Buffer metrics for at most this long before publishing to CloudWatch.
#metricsBufferTimeMillis = 10000

# Buffer at most this many metrics before publishing to CloudWatch.
#metricsMaxQueueSize = 10000

# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
# to RecordProcessorCheckpointer#checkpoint(String) by default.
#validateSequenceNumberBeforeCheckpointing = true

# The maximum number of active threads for the MultiLangDaemon to permit.
# If a value is provided then a FixedThreadPool is used with the maximum
# active threads set to the provided value. If a non-positive integer or no
# value is provided a CachedThreadPool is used.
#maxActiveThreads = 0
```

### Application name (Nome applicazione)
<a name="kinesis-record-processor-application-name-py"></a>

La KCL richiede un nome dell'applicazione univoco per tutte le applicazioni e per tutte le tabelle Amazon DynamoDB nella stessa Regione. La biblioteca utilizza il valore di configurazione del nome dell'applicazione nei seguenti modi:
+ Si suppone che tutti i lavoratori associati con questo nome dell'applicazione stiano lavorando insieme nello stesso flusso. Questi lavoratori possono essere distribuiti su più istanze. Se si esegue un'istanza aggiuntiva dello stesso codice dell'applicazione, ma con un nome dell'applicazione diverso, la KCL tratta la seconda istanza come un'applicazione completamente separata che opera anch'essa nello stesso flusso.
+ La KCL crea una tabella DynamoDB con il nome dell'applicazione e la utilizza per mantenere le informazioni sullo stato (ad esempio, checkpoint e mappatura worker-partizione) per l'applicazione. Ogni applicazione ha la propria tabella DynamoDB. Per ulteriori informazioni, consulta [Utilizza una tabella di leasing per tenere traccia degli shard elaborati dall'applicazione consumer KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Credenziali
<a name="kinesis-record-processor-creds-py"></a>

È necessario rendere disponibili le AWS credenziali a uno dei provider di credenziali della catena di provider di [credenziali predefinita](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Puoi utilizzare la proprietà `AWSCredentialsProvider` per impostare un provider di credenziali. Se esegui la tua applicazione consumer su un'istanza Amazon EC2, ti consigliamo di configurare l'istanza con un ruolo IAM. AWS le credenziali che riflettono le autorizzazioni associate a questo ruolo IAM vengono rese disponibili alle applicazioni sull'istanza tramite i relativi metadati dell'istanza. Questo è il modo più sicuro per gestire le credenziali per un'applicazione di consumo in esecuzione in un'istanza EC2.

# Sviluppa utenti avanzati con fan-out con KCL 2.x
<a name="building-enhanced-consumers-kcl-retired"></a>

**Importante**  
Le versioni 1.x e 2.x di Amazon KCL sono obsolete. KCL 1.x end-of-support arriverà il 30 gennaio 2026. **Ti consigliamo vivamente di** migrare le tue applicazioni KCL utilizzando la versione 1.x all'ultima versione di KCL prima del 30 gennaio 2026. Per trovare la versione più recente di KCL, consulta la pagina [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) su. GitHub Per informazioni sulle ultime versioni di KCL, consulta. [Usa la libreria client Kinesis](kcl.md) Per informazioni sulla migrazione da KCL 1.x a KCL 3.x, consulta. [Migrazione di KLC da 1.x a 3.x](kcl-migration-1-3.md)

Le applicazioni consumer che utilizzano il *fan-out avanzato* in Flusso di dati Amazon Kinesis possono ricevere record da un flusso di dati con velocità di trasmissione effettiva dedicata fino a 2 MiB di dati al secondo per partizione. Questo tipo di applicazioni consumer non sono in competizione con altre applicazioni che ricevono dati dal flusso. Per ulteriori informazioni, consulta [Sviluppa consumatori con fan-out migliorati con un throughput dedicato](enhanced-consumers.md).

È possibile utilizzare la versione 2.0 o una successiva della Kinesis Client Library (KCL) per sviluppare applicazioni che usano il fan-out avanzato per ricevere dati dai flussi. KCL sottoscrive automaticamente l'applicazione a tutti gli shard di uno stream e garantisce che l'applicazione consumer sia in grado di leggere con un valore di throughput di 2 per shard. MB/sec Se desideri utilizzare la KCL senza attivare la funzionalità fan-out avanzato, consulta [Sviluppo di applicazioni Consumer tramite la Kinesis Client Library 2.0](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl-v2.html).

**Topics**
+ [Sviluppa utenti fan-out avanzati utilizzando KCL 2.x in Java](building-enhanced-consumers-kcl-java.md)

# Sviluppa utenti fan-out avanzati utilizzando KCL 2.x in Java
<a name="building-enhanced-consumers-kcl-java"></a>

**Importante**  
Le versioni 1.x e 2.x di Amazon KCL sono obsolete. KCL 1.x end-of-support arriverà il 30 gennaio 2026. **Ti consigliamo vivamente di** migrare le tue applicazioni KCL utilizzando la versione 1.x all'ultima versione di KCL prima del 30 gennaio 2026. Per trovare la versione più recente di KCL, consulta la pagina [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) su. GitHub Per informazioni sulle ultime versioni di KCL, consulta. [Usa la libreria client Kinesis](kcl.md) Per informazioni sulla migrazione da KCL 1.x a KCL 3.x, consulta. [Migrazione di KLC da 1.x a 3.x](kcl-migration-1-3.md)

È possibile utilizzare la versione 2.0 o una successiva della Kinesis Client Library (KCL) per sviluppare applicazioni in Flusso di dati Amazon Kinesis per ricevere dati dai flussi tramite il fan-out avanzato. Il seguente codice mostra un esempio di implementazione in Java di `ProcessorFactory` e `RecordProcessor`.

Si consiglia di utilizzare `KinesisClientUtil` per creare `KinesisAsyncClient` e configurare `maxConcurrency` in `KinesisAsyncClient`.

**Importante**  
Il client Amazon Kinesis potrebbe presentare un aumento significativo della latenza, a meno che non venga configurato `KinesisAsyncClient` per avere una `maxConcurrency` sufficientemente alta per consentire tutti i canoni più ulteriori utilizzi di `KinesisAsyncClient`.

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License. 
 */

/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        log.info("Cancelling producer, and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown. Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }


    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```

# Migra i consumatori da KCL 1.x a KCL 2.x
<a name="kcl-migration"></a>

**Importante**  
Le versioni 1.x e 2.x di Amazon KCL sono obsolete. KCL 1.x arriverà il 30 gennaio 2026. end-of-support **Ti consigliamo vivamente di** migrare le tue applicazioni KCL utilizzando la versione 1.x all'ultima versione di KCL prima del 30 gennaio 2026. Per trovare la versione più recente di KCL, consulta la pagina [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) su. GitHub Per informazioni sulle ultime versioni di KCL, consulta. [Usa la libreria client Kinesis](kcl.md) Per informazioni sulla migrazione da KCL 1.x a KCL 3.x, consulta. [Migrazione di KLC da 1.x a 3.x](kcl-migration-1-3.md)

Questo argomento descrive le differenze tra le versioni 1.x e 2.x della Kinesis Client Library (KCL). Viene inoltre illustrato come migrare l'applicazione consumer dalla versione 1.x alla versione 2.x della KCL. Dopo la migrazione, il client avvierà l'elaborazione dei record dall'ultimo punto di controllo verificato.

La versione 2.0 della KCL introduce le seguenti modifiche di interfaccia:


**Modifiche di interfaccia KCL**  

| Interfaccia KCL 1.x | Interfaccia KCL 2.0 | 
| --- | --- | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor | software.amazon.kinesis.processor.ShardRecordProcessor | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory | software.amazon.kinesis.processor.ShardRecordProcessorFactory | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware | Piegata in software.amazon.kinesis.processor.ShardRecordProcessor | 

**Topics**
+ [Migrare il processore di registrazione](#recrod-processor-migration)
+ [Eseguire la migrazione della fabbrica del processore di registrazione](#recrod-processor-factory-migration)
+ [Migrare il lavoratore](#worker-migration)
+ [Configurazione del client Amazon Kinesis](#client-configuration)
+ [Rimozione dei tempi di inattività](#idle-time-removal)
+ [Rimozioni della configurazione del client](#client-configuration-removals)

## Migrare il processore di registrazione
<a name="recrod-processor-migration"></a>

L'esempio seguente mostra un elaboratore di record implementato per &KCL; 1.x:

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        //
        // Process records, and possibly checkpoint
        //
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**Per migrare la classe dell'elaboratore di record**

1. Modifica le interfacce da `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` e `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` verso `software.amazon.kinesis.processor.ShardRecordProcessor`, come segue:

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   
   // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
   public class TestRecordProcessor implements ShardRecordProcessor {
   ```

1. Aggiorna le istruzioni `import` per i metodi `initialize` e `processRecords`.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
   ```

1. Sostituisci il metodo `shutdown` con i seguenti nuovi metodi: `leaseLost`, `shardEnded` e `shutdownRequested`.

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

Segue la versione aggiornata della classe dell'elaboratore di record.

```
package com.amazonaws.kcl;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;

public class TestRecordProcessor implements ShardRecordProcessor {
    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

## Eseguire la migrazione della fabbrica del processore di registrazione
<a name="recrod-processor-factory-migration"></a>

La fabbrica dell'elaboratore di record è responsabile per la creazione di elaboratori di record quando un lease è acquisito. Di seguito è illustrato un esempio di una fabbrica &KCL; 1.x.

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class TestRecordProcessorFactory implements IRecordProcessorFactory {
    @Override
    public IRecordProcessor createProcessor() {
        return new TestRecordProcessor();
    }
}
```

**Per migrare la fabbrica dell'elaboratore di record**

1. Modifica l'interfaccia implementata da `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` a `software.amazon.kinesis.processor.ShardRecordProcessorFactory`, come segue.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
   
   // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
   public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
   ```

1. Modifica la firma di ritorno per `createProcessor`.

   ```
   // public IRecordProcessor createProcessor() {
   public ShardRecordProcessor shardRecordProcessor() {
   ```

Di seguito è riportato un esempio di fabbrica di elaboratore di record in 2.0:

```
package com.amazonaws.kcl;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new TestRecordProcessor();
    }
}
```

## Migrare il lavoratore
<a name="worker-migration"></a>

Nella versione 2.0 della KCL, una nuova classe, denominata `Scheduler`, sostituisce la classe `Worker`. Di seguito è illustrato un esempio di un worker di KCL 1.x.

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

**Per migrare il lavoratore**

1. Modifica la dichiarazione `import` per la classe `Worker` nelle dichiarazioni di importazione delle classi `Scheduler` e `ConfigsBuilder`.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. Crea `ConfigsBuilder` e `Scheduler` come mostrato nell'esempio seguente.

   Si consiglia di utilizzare `KinesisClientUtil` per creare `KinesisAsyncClient` e configurare `maxConcurrency` in `KinesisAsyncClient`.
**Importante**  
Il client Amazon Kinesis potrebbe presentare un aumento significativo della latenza, a meno che non venga configurato `KinesisAsyncClient` per avere una `maxConcurrency` sufficientemente alta per consentire tutti i canoni più ulteriori utilizzi di `KinesisAsyncClient`.

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   import software.amazon.kinesis.common.ConfigsBuilder;
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
   Region region = Region.AP_NORTHEAST_2;
   KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region));
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
   ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());
   
   Scheduler scheduler = new Scheduler(
       configsBuilder.checkpointConfig(),
       configsBuilder.coordinatorConfig(),
       configsBuilder.leaseManagementConfig(),
       configsBuilder.lifecycleConfig(),
       configsBuilder.metricsConfig(),
       configsBuilder.processorConfig(),
       configsBuilder.retrievalConfig()
       );
   ```

## Configurazione del client Amazon Kinesis
<a name="client-configuration"></a>

Con il rilascio 2.0 della Kinesis Client Library, la configurazione del client si è spostata da una singola classe di configurazione (`KinesisClientLibConfiguration`) a sei classi di configurazione. La tabella seguente descrive la migrazione.


**Campi di configurazione e relative nuove classi**  

| Campo originale | Nuova classe di configurazione | Description | 
| --- | --- | --- | 
| applicationName | ConfigsBuilder | Il nome per l'applicazione della KCL. Utilizzato come predefinito per tableName e consumerName. | 
| tableName | ConfigsBuilder | Consente di ignorare il nome della tabella utilizzato per la tabella di lease di Amazon DynamoDB. | 
| streamName | ConfigsBuilder | Il nome del flusso dal quale l'applicazione elabora i record. | 
| kinesisEndpoint | ConfigsBuilder | Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni. | 
| dynamoDBEndpoint | ConfigsBuilder | Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni. | 
| initialPositionInStreamExtended | RetrievalConfig | La posizione nello shard da cui il KCL inizia a recuperare i record, a partire dall'esecuzione iniziare all'applicazione. | 
| kinesisCredentialsProvider | ConfigsBuilder | Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni. | 
| dynamoDBCredentialsProvider | ConfigsBuilder | Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni. | 
| cloudWatchCredentialsProvider | ConfigsBuilder | Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni. | 
| failoverTimeMillis | LeaseManagementConfig | Il numero di millisecondi che devono passare prima di poter considerare un proprietario di lease come fallito. | 
| workerIdentifier | ConfigsBuilder | Un identificatore univoco che rappresenta la creazione dell'elaboratore di applicazione. Deve essere univoco. | 
| shardSyncIntervalMillis | LeaseManagementConfig | Il periodo di tempo tra le chiamate di sincronizzazione dello shard. | 
| maxRecords | PollingConfig | Consente di impostare il numero massimo di record restituiti da Kinesis. | 
| idleTimeBetweenReadsInMillis | CoordinatorConfig | Questa opzione è stata eliminata. Vedi rimozione tempo di inattività. | 
| callProcessRecordsEvenForEmptyRecordList | ProcessorConfig | Quando impostato, l'elaboratore di record viene chiamato anche quando nessun record è stato fornito da . | 
| parentShardPollIntervalMillis | CoordinatorConfig | Con quale frequenza un elaboratore di record deve eseguire il polling per vedere se il shard padre è stata completato. | 
| cleanupLeasesUponShardCompletion | LeaseManagementConfig | Quando impostati, i lease vengono rimossi non appena i lease figlio hanno iniziato l'elaborazione. | 
| ignoreUnexpectedChildShards | LeaseManagementConfig | Quando impostato, i shard figlio che hanno un shard aperto vengono ignorati. Questo è principalmente per DynamoDB Streams. | 
| kinesisClientConfig | ConfigsBuilder | Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni. | 
| dynamoDBClientConfig | ConfigsBuilder | Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni. | 
| cloudWatchClientConfig | ConfigsBuilder | Questa opzione è stata eliminata. Configurazione client, vedere la sezione Rimozioni. | 
| taskBackoffTimeMillis | LifecycleConfig | Il tempo di attesa per riprovare operazioni non riuscite. | 
| metricsBufferTimeMillis | MetricsConfig | Controlla la pubblicazione CloudWatch delle metriche. | 
| metricsMaxQueueSize | MetricsConfig | Controlla la pubblicazione CloudWatch delle metriche. | 
| metricsLevel | MetricsConfig | Controlla la pubblicazione CloudWatch delle metriche. | 
| metricsEnabledDimensions | MetricsConfig | Controlla la pubblicazione CloudWatch delle metriche. | 
| validateSequenceNumberBeforeCheckpointing | CheckpointConfig | Questa opzione è stata eliminata. Vedi la convalida del numero di sequenza del checkpoint. | 
| regionName | ConfigsBuilder | Questa opzione è stata eliminata. Vedi la rimozione della configurazione client. | 
| maxLeasesForWorker | LeaseManagementConfig | Il numero massimo di lease che una singola istanza dell'applicazione deve accettare. | 
| maxLeasesToStealAtOneTime | LeaseManagementConfig | Il numero massimo di lease che un'applicazione deve tentare di intercettare simultaneamente. | 
| initialLeaseTableReadCapacity | LeaseManagementConfig | La IOPs lettura DynamoDB utilizzata se la Kinesis Client Library deve creare una nuova tabella di lease DynamoDB. | 
| initialLeaseTableWriteCapacity | LeaseManagementConfig | La IOPs lettura DynamoDB utilizzata se la Kinesis Client Library deve creare una nuova tabella di lease DynamoDB. | 
| initialPositionInStreamExtended | LeaseManagementConfig | La posizione iniziale nel flusso nella quale l'applicazione dovrebbe iniziare. Questo viene utilizzato soltanto durante la creazione del lease iniziale. | 
| skipShardSyncAtWorkerInitializationIfLeasesExist | CoordinatorConfig | Disabilita la sincronizzazione dei dati shard se la tabella di lease contiene lease esistenti. DA KinesisEco FARE: -438 | 
| shardPrioritization | CoordinatorConfig | Quale prioritizzazione shard utilizzare. | 
| shutdownGraceMillis | N/D | Questa opzione è stata eliminata. Vedi Traslochi MultiLang . | 
| timeoutInSeconds | N/D | Questa opzione è stata eliminata. Vedi MultiLang Rimozioni. | 
| retryGetRecordsInSeconds | PollingConfig | Configura il ritardo tra i GetRecords tentativi di errore. | 
| maxGetRecordsThreadPool | PollingConfig | La dimensione del pool di thread utilizzato per. GetRecords | 
| maxLeaseRenewalThreads | LeaseManagementConfig | Controlla le dimensioni del pool di thread di rinnovo del lease. Quanto maggiori sono i lease che può richiedere l'applicazione, tanto più grande deve essere questo pool. | 
| recordsFetcherFactory | PollingConfig | Consente di sostituire la factory utilizzata per creare fetcher che recuperano dai flussi. | 
| logWarningForTaskAfterMillis | LifecycleConfig | Quanto tempo bisogna attendere prima che venga registrato un avviso se un'attività non è stata completata. | 
| listShardsBackoffTimeInMillis | RetrievalConfig | Il numero di millisecondi di attesa tra le chiamate in ListShards quando si verificano errori. | 
| maxListShardsRetryAttempts | RetrievalConfig | Il numero massimo di volte che ListShards effettua nuovi tentativi prima di desistere. | 

## Rimozione dei tempi di inattività
<a name="idle-time-removal"></a>

Nella versione 1.x di &KCL;, `idleTimeBetweenReadsInMillis` corrispondeva a due quantità: 
+ La quantità di tempo tra controlli dell'attività. È ora possibile configurare questo periodo tra attività impostando `CoordinatorConfig#shardConsumerDispatchPollIntervalMillis`.
+ La quantità di tempo di sospensione quando non è stato restituito alcun record da . Nella versione 2.0, nel fan-out ottimizzato, i record vengono inviati da chi li ha recuperati. L'attività sul consumo di shard avviene solo quando arriva una richiesta di push. 

## Rimozioni della configurazione del client
<a name="client-configuration-removals"></a>

Nella versione 2.0, la KCL non crea più client. Spetta all'utente fornire un client valido. Con questa modifica, tutti i parametri di configurazione che controllavano la creazione del client sono stati rimossi. Se hai bisogno di questi parametri, puoi impostarli sui client prima di fornire i client a `ConfigsBuilder`.


****  

| Campo rimosso | Configurazione equivalente | 
| --- | --- | 
| kinesisEndpoint | Configura SDK KinesisAsyncClient con l'endpoint preferito: KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis endpoint>")).build(). | 
| dynamoDBEndpoint | Configura SDK DynamoDbAsyncClient con l'endpoint preferito: DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb endpoint>")).build(). | 
| kinesisClientConfig | Configura SDK KinesisAsyncClient con la configurazione necessaria: KinesisAsyncClient.builder().overrideConfiguration(<your configuration>).build(). | 
| dynamoDBClientConfig | Configura SDK DynamoDbAsyncClient con la configurazione necessaria: DynamoDbAsyncClient.builder().overrideConfiguration(<your configuration>).build(). | 
| cloudWatchClientConfig | Configura SDK CloudWatchAsyncClient con la configurazione necessaria: CloudWatchAsyncClient.builder().overrideConfiguration(<your configuration>).build(). | 
| regionName | Configura SDK con la regione preferita. Questo è uguale per tutti i client SDK. Ad esempio, KinesisAsyncClient.builder().region(Region.US\$1WEST\$12).build(). | 