

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Sviluppa i consumatori con KCL in Java
<a name="develop-kcl-consumers-java"></a>

## Prerequisiti
<a name="develop-kcl-consumers-java-prerequisites"></a>

Prima di iniziare a utilizzare KCL 3.x, assicurati di disporre di quanto segue:
+ Java Development Kit (JDK) 8 o versione successiva
+ AWS SDK per Java 2.x
+ Maven o Gradle per la gestione delle dipendenze

KCL raccoglie i parametri di utilizzo della CPU, come l'utilizzo della CPU, dall'host di elaborazione su cui lavorano i lavoratori per bilanciare il carico e raggiungere un livello di utilizzo uniforme delle risorse tra i lavoratori. Per consentire a KCL di raccogliere i parametri di utilizzo della CPU dai lavoratori, è necessario soddisfare i seguenti prerequisiti:

 **Amazon Elastic Compute Cloud(Amazon EC2)**
+ Il sistema operativo deve essere Linux OS.
+ È necessario abilitarlo [IMDSv2](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html)nella propria istanza EC2.

 **Amazon Elastic Container Service (Amazon ECS) su Amazon EC2**
+ Il sistema operativo deve essere Linux OS.
+ È necessario abilitare l'endpoint [ECS Task Metadata Endpoint versione 4](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ec2-metadata.html). 
+ La versione dell'agente container Amazon ECS deve essere 1.39.0 o successiva.

 **Amazon ECS su AWS Fargate**
+ È necessario abilitare l'[endpoint dei metadati delle attività Fargate](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4-fargate.html) versione 4. Se si utilizza la versione 1.4.0 o successiva della piattaforma Fargate, questa opzione è abilitata per impostazione predefinita. 
+ Piattaforma Fargate versione 1.4.0 o successiva.

 **Amazon Elastic Kubernetes Service (Amazon EKS) su Amazon EC2** 
+ Il sistema operativo deve essere Linux OS.

 **Amazon EKS su AWS Fargate**
+ Piattaforma Fargate 1.3.0 o versione successiva.

**Importante**  
Se KCL non è in grado di raccogliere i parametri di utilizzo della CPU dai lavoratori, ricorrerà alla velocità effettiva per lavoratore per assegnare i contratti di locazione e bilanciare il carico tra i lavoratori della flotta. Per ulteriori informazioni, consulta [In che modo KCL assegna i contratti di locazione ai lavoratori e bilancia il carico](kcl-dynamoDB.md#kcl-assign-leases).

## Installa e aggiungi dipendenze
<a name="develop-kcl-consumers-java-installation"></a>

Se usi Maven, aggiungi la seguente dipendenza al tuo file. `pom.xml` Assicurati di aver sostituito 3.x.x con l'ultima versione di KCL. 

```
<dependency>
    <groupId>software.amazon.kinesis</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>3.x.x</version> <!-- Use the latest version -->
</dependency>
```

Se stai usando Gradle, aggiungi quanto segue al tuo file. `build.gradle` Assicurati di aver sostituito 3.x.x con l'ultima versione di KCL. 

```
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
```

[Puoi verificare la versione più recente di KCL sul Maven Central Repository.](https://search.maven.org/artifact/software.amazon.kinesis/amazon-kinesis-client)

## Implementa il consumatore
<a name="develop-kcl-consumers-java-implemetation"></a>

Un'applicazione consumer KCL è composta dai seguenti componenti chiave:

**Topics**
+ [RecordProcessor](#implementation-recordprocessor)
+ [RecordProcessorFactory](#implementation-recordprocessorfactory)
+ [Pianificatore](#implementation-scheduler)
+ [Applicazione principale per i consumatori](#implementation-main)

### RecordProcessor
<a name="implementation-recordprocessor"></a>

RecordProcessor è il componente principale in cui risiede la logica aziendale per l'elaborazione dei record del flusso di dati Kinesis. Definisce in che modo l'applicazione elabora i dati che riceve dal flusso Kinesis.

Responsabilità chiave:
+ Inizializza l'elaborazione per uno shard
+ Elabora batch di record dal flusso Kinesis
+ Arresta l'elaborazione di uno shard (ad esempio, quando lo shard si divide o si fonde o il leasing viene ceduto a un altro host)
+ Gestisci i checkpoint per monitorare i progressi

Di seguito viene illustrato un esempio di implementazione:

```
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.*;
import software.amazon.kinesis.processor.ShardRecordProcessor;

public class SampleRecordProcessor implements ShardRecordProcessor {
    private static final String SHARD_ID_MDC_KEY = "ShardId";
    private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);
    private String shardId;

    @Override
    public void initialize(InitializationInput initializationInput) {
        shardId = initializationInput.shardId();
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Processing {} record(s)", processRecordsInput.records().size());
            processRecordsInput.records().forEach(r -> 
                log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber())
            );
            
            // Checkpoint periodically
            processRecordsInput.checkpointer().checkpoint();
        } catch (Throwable t) {
            log.error("Caught throwable while processing records. Aborting.", t);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Lost lease, so terminating.");
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Reached shard end checkpointing.");
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Exception while checkpointing at shard end. Giving up.", e);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Scheduler is shutting down, checkpointing.");
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }
}
```

Di seguito è riportata una spiegazione dettagliata di ogni metodo utilizzato nell'esempio:

**inizializza (InitializationInputinitializationInput)**
+ Scopo: impostare le risorse o lo stato necessari per l'elaborazione dei record.
+ Quando viene chiamato: una volta, quando KCL assegna uno shard a questo registratore.
+ Punti chiave:
  + `initializationInput.shardId()`: L'ID dello shard che questo processore gestirà.
  + `initializationInput.extendedSequenceNumber()`: Il numero di sequenza da cui iniziare l'elaborazione.

**ProcessRecords () ProcessRecordsInput processRecordsInput**
+ Scopo: Elaborare i record in entrata e, facoltativamente, controllare l'avanzamento del checkpoint.
+ Quando viene chiamato: ripetutamente, purché il processore di dischi detenga il contratto di locazione dello shard.
+ Punti chiave:
  + `processRecordsInput.records()`: Elenco dei record da elaborare.
  + `processRecordsInput.checkpointer()`: utilizzato per controllare lo stato di avanzamento.
  + Assicurati di aver gestito tutte le eccezioni durante l'elaborazione per evitare che KCL fallisca.
  + Questo metodo dovrebbe essere idempotente, poiché lo stesso record può essere elaborato più di una volta in alcuni scenari, ad esempio dati che non sono stati sottoposti a checkpoint prima che un lavoratore si blocchi o riavvii imprevisti.
  + Svuota sempre tutti i dati memorizzati nel buffer prima del checkpoint per garantire la coerenza dei dati.

**LeaseLostInput leaseLostInputleaseLost ()**
+ Scopo: ripulire tutte le risorse specifiche per l'elaborazione di questo frammento.
+ Quando viene chiamato: quando un altro Scheduler assume il contratto di locazione di questo shard.
+ Punti chiave:
  + Il checkpoint non è consentito con questo metodo.

**shardEnded () ShardEndedInput shardEndedInput**
+ Scopo: completare l'elaborazione di questo frammento e di questo checkpoint.
+ Quando viene chiamato: quando lo shard si divide o si fonde, indica che tutti i dati dello shard sono stati elaborati.
+ Punti chiave:
  + `shardEndedInput.checkpointer()`: utilizzato per eseguire il checkpoint finale.
  + Il checkpoint in questo metodo è obbligatorio per completare l'elaborazione.
  + Il mancato ripristino dei dati e del checkpoint in questo campo può comportare la perdita dei dati o un'elaborazione duplicata alla riapertura dello shard.

**ShutdownRequestedInput shutdownRequestedInputShutdownRequested ()**
+ Scopo: controllare e ripulire le risorse quando KCL si spegne.
+ Quando viene chiamato: quando KCL si spegne, ad esempio quando l'applicazione viene chiusa).
+ Punti chiave:
  + `shutdownRequestedInput.checkpointer()`: utilizzato per eseguire il checkpoint prima dello spegnimento.
  + Assicurati di aver implementato il checkpoint nel metodo in modo che i progressi vengano salvati prima dell'interruzione dell'applicazione.
  + Il mancato ripristino dei dati e del checkpoint in questo campo può comportare la perdita dei dati o la rielaborazione dei record al riavvio dell'applicazione.

**Importante**  
KCL 3.x garantisce un minor numero di ritrattamenti dei dati quando il contratto di locazione viene ceduto da un lavoratore a un altro lavoratore effettuando un checkpoint prima che il lavoratore precedente venga chiuso. Se non implementate la logica di checkpoint nel `shutdownRequested()` metodo, non vedrete questo vantaggio. Assicurati di aver implementato una logica di checkpoint all'interno del metodo. `shutdownRequested()`

### RecordProcessorFactory
<a name="implementation-recordprocessorfactory"></a>

RecordProcessorFactory è responsabile della creazione di nuove RecordProcessor istanze. KCL utilizza questa factory per crearne una nuova RecordProcessor per ogni shard che l'applicazione deve elaborare.

Responsabilità chiave:
+ Crea nuove RecordProcessor istanze su richiesta
+ Assicurati che ognuna RecordProcessor sia inizializzata correttamente

Di seguito è riportato un esempio di implementazione:

```
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new SampleRecordProcessor();
    }
}
```

In questo esempio, la factory ne crea una nuova SampleRecordProcessor ogni volta che viene chiamata shardRecordProcessor (). È possibile estenderlo per includere qualsiasi logica di inizializzazione necessaria.

### Pianificatore
<a name="implementation-scheduler"></a>

Scheduler è un componente di alto livello che coordina tutte le attività dell'applicazione KCL. È responsabile dell'orchestrazione complessiva dell'elaborazione dei dati.

Responsabilità chiave:
+ Gestire il ciclo di vita di RecordProcessors
+ Gestisci la gestione del leasing per i frammenti
+ Coordina il checkpoint
+ Bilanciate il carico di elaborazione degli shard tra i diversi worker della vostra applicazione
+ Gestisci segnali di spegnimento e terminazione delle applicazioni senza interruzioni

Scheduler viene in genere creato e avviato nell'applicazione principale. È possibile controllare l'esempio di implementazione di Scheduler nella sezione seguente, Main Consumer Application. 

### Applicazione principale per i consumatori
<a name="implementation-main"></a>

L'applicazione principale per i consumatori collega tutti i componenti. È responsabile della configurazione del consumatore KCL, della creazione dei client necessari, della configurazione dello Scheduler e della gestione del ciclo di vita dell'applicazione.

Responsabilità chiave:
+ Configurare i client AWS di servizio (Kinesis, DynamoDB,) CloudWatch
+ Configura l'applicazione KCL
+ Crea e avvia lo Scheduler
+ Gestisci l'arresto dell'applicazione

Di seguito è riportato un esempio di implementazione:

```
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import java.util.UUID;

public class SampleConsumer {
    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    public SampleConsumer(String streamName, Region region) {
        this.streamName = streamName;
        this.region = region;
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    public void run() {
        DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        
        ConfigsBuilder configsBuilder = new ConfigsBuilder(
            streamName, 
            streamName, 
            kinesisClient, 
            dynamoDbAsyncClient,
            cloudWatchClient, 
            UUID.randomUUID().toString(), 
            new SampleRecordProcessorFactory()
        );

        Scheduler scheduler = new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();
    }

    public static void main(String[] args) {
        String streamName = "your-stream-name"; // replace with your stream name
        Region region = Region.US_EAST_1; // replace with your region
        new SampleConsumer(streamName, region).run();
    }
}
```

 Per impostazione predefinita, KCL crea un consumatore Enhanced Fan-out (EFO) con throughput dedicato. Per ulteriori informazioni su Enhanced Fan-out, consulta. [Sviluppa consumatori con fan-out migliorati con un throughput dedicato](enhanced-consumers.md) Se hai meno di 2 consumatori o non hai bisogno di ritardi di propagazione in lettura inferiori a 200 ms, devi impostare la seguente configurazione nell'oggetto scheduler per utilizzare i consumatori a throughput condiviso:

```
configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
```

Il codice seguente è un esempio di creazione di un oggetto scheduler che utilizza consumatori a throughput condiviso:

**Importazioni:**

```
import software.amazon.kinesis.retrieval.polling.PollingConfig;
```

**Codice**:

```
Scheduler scheduler = new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );/
```