Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Sviluppa i consumatori con KCL in Java
Prerequisiti
Prima di iniziare a utilizzare KCL 3.x, assicurati di avere quanto segue:
-
Java Development Kit (JDK) 8 o versione successiva
-
AWS SDK for Java 2.x
-
Maven o Gradle per la gestione delle dipendenze
KCL raccoglie i parametri di utilizzo della CPU, come l'utilizzo della CPU, dall'host di elaborazione su cui lavorano i lavoratori per bilanciare il carico e raggiungere un livello di utilizzo uniforme delle risorse tra i lavoratori. Per consentire a KCL di raccogliere i parametri di utilizzo della CPU dai lavoratori, è necessario soddisfare i seguenti prerequisiti:
Amazon Elastic Compute Cloud(Amazon EC2)
-
Il sistema operativo deve essere Linux OS.
-
È necessario IMDSv2abilitarlo nella propria EC2 istanza.
Amazon Elastic Container Service (Amazon ECS) su Amazon EC2
-
Il sistema operativo deve essere Linux OS.
-
È necessario abilitare l'endpoint ECS Task Metadata Endpoint versione 4.
-
La versione dell'agente container Amazon ECS deve essere 1.39.0 o successiva.
Amazon ECS su AWS Fargate
-
È necessario abilitare l'endpoint dei metadati delle attività Fargate versione 4. Se si utilizza la versione 1.4.0 o successiva della piattaforma Fargate, questa opzione è abilitata per impostazione predefinita.
-
Piattaforma Fargate versione 1.4.0 o successiva.
Amazon Elastic Kubernetes Service (Amazon EKS) su Amazon EC2
-
Il sistema operativo deve essere Linux OS.
Amazon EKS su AWS Fargate
-
Piattaforma Fargate 1.3.0 o versione successiva.
Importante
Se KCL non è in grado di raccogliere i parametri di utilizzo della CPU dai lavoratori, ricorrerà alla velocità effettiva per lavoratore per assegnare i contratti di locazione e bilanciare il carico tra i lavoratori della flotta. Per ulteriori informazioni, consulta In che modo KCL assegna i contratti di locazione ai lavoratori e bilancia il carico.
Installa e aggiungi dipendenze
Se usi Maven, aggiungi la seguente dipendenza al tuo file. pom.xml
Assicurati di aver sostituito 3.x.x con l'ultima versione di KCL.
<dependency> <groupId>software.amazon.kinesis</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>3.x.x</version> <!-- Use the latest version --> </dependency>
Se stai usando Gradle, aggiungi quanto segue al tuo file. build.gradle
Assicurati di aver sostituito 3.x.x con l'ultima versione di KCL.
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
Puoi verificare la versione più recente di KCL sul Maven Central Repository.
Implementa il consumatore
Un'applicazione consumer KCL è composta dai seguenti componenti chiave:
Componenti chiave
RecordProcessor
RecordProcessor è il componente principale in cui risiede la logica aziendale per l'elaborazione dei record del flusso di dati Kinesis. Definisce in che modo l'applicazione elabora i dati che riceve dal flusso Kinesis.
Responsabilità chiave:
-
Inizializza l'elaborazione per uno shard
-
Elabora batch di record dal flusso Kinesis
-
Arresta l'elaborazione di uno shard (ad esempio, quando lo shard si divide o si fonde o il leasing viene ceduto a un altro host)
-
Gestisci i checkpoint per tenere traccia dei progressi
Quanto segue mostra un esempio di implementazione:
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.*; import software.amazon.kinesis.processor.ShardRecordProcessor; public class SampleRecordProcessor implements ShardRecordProcessor { private static final String SHARD_ID_MDC_KEY = "ShardId"; private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class); private String shardId; @Override public void initialize(InitializationInput initializationInput) { shardId = initializationInput.shardId(); MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber()); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Processing {} record(s)", processRecordsInput.records().size()); processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()) ); // Checkpoint periodically processRecordsInput.checkpointer().checkpoint(); } catch (Throwable t) { log.error("Caught throwable while processing records. Aborting.", t); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Lost lease, so terminating."); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void shardEnded(ShardEndedInput shardEndedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Scheduler is shutting down, checkpointing."); shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at requested shutdown. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } }
Di seguito è riportata una spiegazione dettagliata di ogni metodo utilizzato nell'esempio:
inizializza (InitializationInputinitializationInput)
-
Scopo: impostare le risorse o lo stato necessari per l'elaborazione dei record.
-
Quando viene chiamato: una volta, quando KCL assegna uno shard a questo registratore.
-
Punti chiave:
-
initializationInput.shardId()
: L'ID dello shard che questo processore gestirà. -
initializationInput.extendedSequenceNumber()
: Il numero di sequenza da cui iniziare l'elaborazione.
-
ProcessRecords () ProcessRecordsInput processRecordsInput
-
Scopo: Elaborare i record in entrata e, facoltativamente, controllare l'avanzamento del checkpoint.
-
Quando viene chiamato: ripetutamente, purché il processore di dischi detenga il contratto di locazione dello shard.
-
Punti chiave:
-
processRecordsInput.records()
: Elenco dei record da elaborare. -
processRecordsInput.checkpointer()
: utilizzato per controllare lo stato di avanzamento. -
Assicurati di aver gestito tutte le eccezioni durante l'elaborazione per evitare che KCL fallisca.
-
Questo metodo dovrebbe essere idempotente, poiché lo stesso record può essere elaborato più di una volta in alcuni scenari, ad esempio dati che non sono stati sottoposti a checkpoint prima che un lavoratore si blocchi o riavvii imprevisti.
-
Svuota sempre tutti i dati memorizzati nel buffer prima del checkpoint per garantire la coerenza dei dati.
-
LeaseLostInput leaseLostInputleaseLost ()
-
Scopo: ripulire tutte le risorse specifiche per l'elaborazione di questo frammento.
-
Quando viene chiamato: quando un altro Scheduler assume il contratto di locazione di questo shard.
-
Punti chiave:
-
Il checkpoint non è consentito con questo metodo.
-
shardEnded () ShardEndedInput shardEndedInput
-
Scopo: completare l'elaborazione di questo frammento e di questo checkpoint.
-
Quando viene chiamato: quando lo shard si divide o si fonde, indica che tutti i dati relativi allo shard sono stati elaborati.
-
Punti chiave:
-
shardEndedInput.checkpointer()
: utilizzato per eseguire il checkpoint finale. -
Il checkpoint in questo metodo è obbligatorio per completare l'elaborazione.
-
Il mancato ripristino dei dati e del checkpoint in questo campo può comportare la perdita dei dati o un'elaborazione duplicata alla riapertura dello shard.
-
ShutdownRequestedInput shutdownRequestedInputShutdownRequested ()
-
Scopo: controllare e ripulire le risorse quando KCL si spegne.
-
Quando viene chiamato: quando KCL si spegne, ad esempio quando l'applicazione è in fase di chiusura).
-
Punti chiave:
-
shutdownRequestedInput.checkpointer()
: utilizzato per eseguire il checkpoint prima dello spegnimento. -
Assicurati di aver implementato il checkpoint nel metodo in modo che i progressi vengano salvati prima che l'applicazione si fermi.
-
Il mancato ripristino dei dati e del checkpoint in questo campo può comportare la perdita dei dati o la rielaborazione dei record al riavvio dell'applicazione.
-
Importante
KCL 3.x garantisce un minor numero di ritrattamenti dei dati quando il contratto di locazione viene ceduto da un lavoratore a un altro lavoratore effettuando un checkpoint prima che il lavoratore precedente venga chiuso. Se non implementate la logica di checkpoint nel shutdownRequested()
metodo, non vedrete questo vantaggio. Assicurati di aver implementato una logica di checkpoint all'interno del metodo. shutdownRequested()
RecordProcessorFactory
RecordProcessorFactory è responsabile della creazione di nuove RecordProcessor istanze. KCL utilizza questa factory per crearne una nuova RecordProcessor per ogni shard che l'applicazione deve elaborare.
Responsabilità chiave:
-
Crea nuove RecordProcessor istanze su richiesta
-
Assicurati che ognuna RecordProcessor sia inizializzata correttamente
Di seguito è riportato un esempio di implementazione:
import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class SampleRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new SampleRecordProcessor(); } }
In questo esempio, la factory ne crea una nuova SampleRecordProcessor ogni volta che viene chiamata shardRecordProcessor (). È possibile estenderlo per includere qualsiasi logica di inizializzazione necessaria.
Pianificatore
Scheduler è un componente di alto livello che coordina tutte le attività dell'applicazione KCL. È responsabile dell'orchestrazione complessiva dell'elaborazione dei dati.
Responsabilità chiave:
-
Gestire il ciclo di vita di RecordProcessors
-
Gestisci la gestione del leasing per i frammenti
-
Coordina il checkpoint
-
Bilanciate il carico di elaborazione degli shard tra i diversi worker della vostra applicazione
-
Gestisci segnali di spegnimento e terminazione delle applicazioni senza interruzioni
Scheduler viene in genere creato e avviato nell'applicazione principale. È possibile controllare l'esempio di implementazione di Scheduler nella sezione seguente, Main Consumer Application.
Applicazione principale per i consumatori
Main Consumer Application collega tutti i componenti. È responsabile della configurazione del consumatore KCL, della creazione dei client necessari, della configurazione dello Scheduler e della gestione del ciclo di vita dell'applicazione.
Responsabilità chiave:
-
Configurare i client AWS di servizio (Kinesis, DynamoDB,) CloudWatch
-
Configura l'applicazione KCL
-
Crea e avvia lo Scheduler
-
Gestisci l'arresto dell'applicazione
Di seguito è riportato un esempio di implementazione:
import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; import java.util.UUID; public class SampleConsumer { private final String streamName; private final Region region; private final KinesisAsyncClient kinesisClient; public SampleConsumer(String streamName, Region region) { this.streamName = streamName; this.region = region; this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region)); } public void run() { DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder( streamName, streamName, kinesisClient, dynamoDbAsyncClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory() ); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() ); Thread schedulerThread = new Thread(scheduler); schedulerThread.setDaemon(true); schedulerThread.start(); } public static void main(String[] args) { String streamName = "your-stream-name"; // replace with your stream name Region region = Region.US_EAST_1; // replace with your region new SampleConsumer(streamName, region).run(); } }
Per impostazione predefinita, KCL crea un consumatore Enhanced Fan-out (EFO) con throughput dedicato. Per ulteriori informazioni su Enhanced Fan-out, consulta. Sviluppa consumatori con fan-out migliorati con un throughput dedicato Se hai meno di 2 consumatori o non hai bisogno di ritardi di propagazione in lettura inferiori a 200 ms, devi impostare la seguente configurazione nell'oggetto scheduler per utilizzare i consumatori a throughput condiviso:
configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
Il codice seguente è un esempio di creazione di un oggetto scheduler che utilizza consumatori a throughput condiviso:
Importazioni:
import software.amazon.kinesis.retrieval.polling.PollingConfig;
Codice:
Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient)) );/