Sviluppa un utente di Kinesis Client Library in. NET - Flusso di dati Amazon Kinesis

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Sviluppa un utente di Kinesis Client Library in. NET

È possibile utilizzare Kinesis Client Library (KCL) per creare applicazioni che elaborano i dati dai flussi di dati Kinesis. La Kinesis Client Library è disponibile in più linguaggi. Questo argomento discute. NET.

KCLÈ una libreria Java; il supporto per linguaggi diversi da Java viene fornito utilizzando un'interfaccia multilingue chiamata. MultiLangDaemon Questo demone è basato su Java e viene eseguito in background quando si utilizza un KCL linguaggio diverso da Java. Pertanto, se si installa il for. KCL NETe scrivi interamente la tua app per i consumatori. NET, è comunque necessario che Java sia installato sul sistema a causa di MultiLangDaemon. Inoltre, MultiLangDaemon presenta alcune impostazioni predefinite che potresti dover personalizzare in base al tuo caso d'uso, ad esempio la AWS regione a cui si connette. Per ulteriori informazioni su MultiLangDaemon on GitHub, vai alla pagina del KCL MultiLangDaemon progetto.

Per scaricare il. NETKCLda GitHub, vai a Kinesis Client Library (. NET). Per scaricare il codice di esempio per un. NETKCLapplicazione per consumatori, vai al KCLmodulo. NETpagina di esempio del progetto per i consumatori su GitHub.

È necessario completare le seguenti attività quando si implementa un'applicazione KCL consumer in. NET:

Implementa i metodi IRecordProcessor della classe

Il consumer deve implementare i seguenti metodi per IRecordProcessor. Il consumer di esempio fornisce implementazioni che è possibile utilizzare come punto di partenza (consulta la classe SampleRecordProcessor in SampleConsumer/AmazonKinesisSampleConsumer.cs).

public void Initialize(InitializationInput input) public void ProcessRecords(ProcessRecordsInput input) public void Shutdown(ShutdownInput input)
Inizializzazione

KCLChiama questo metodo quando viene creata un'istanza del processore di registrazione, passando uno shard ID specifico nel input parametro (). input.ShardId Questo processore di record elabora esclusivamente questo shard e, in genere, è vero anche il contrario (questo shard è elaborato solo da questo processore di record). Tuttavia, il tuo consumer deve tenere conto della possibilità che un record di dati possa essere elaborato più di una volta. Ciò si verifica perché il flusso di dati Kinesis ha una semantica almeno una volta, il che significa che ogni record di dati da una partizione viene elaborato almeno una volta da un worker nel tuo consumer. Per ulteriori informazioni sui casi in cui un determinato shard può essere elaborato da più di un lavoratore, consulta Usa il resharding, lo scaling e l'elaborazione parallela per modificare il numero di shard.

public void Initialize(InitializationInput input)
ProcessRecords

KCLChiama questo metodo, passando un elenco di record di dati nel input parametro (input.Records) dallo shard specificato dal metodo. Initialize Il processore di record che implementi elabora i dati in questi record in base alla semantica del tuo consumer. Ad esempio, il worker potrebbe eseguire una trasformazione dei dati e, successivamente, archiviare il risultato in un bucket Amazon Simple Storage Service (Amazon S3).

public void ProcessRecords(ProcessRecordsInput input)

Oltre ai dati stessi, il record contiene anche un numero di sequenza e una chiave di partizione. Il lavoratore può utilizzare questi valori quando elabora i dati. Ad esempio, il lavoratore può scegliere il bucket S3 in cui archiviare i dati in base al valore della chiave di partizione. La classe Record espone quanto segue per accedere ai dati del record, al numero di sequenza e alla chiave di partizione:

byte[] Record.Data string Record.SequenceNumber string Record.PartitionKey

Nell'esempio, il metodo ProcessRecordsWithRetries ha un codice che mostra in che modo un lavoratore può accedere ai dati del record, al numero di sequenza e alla chiave di partizione.

Il flusso di dati Kinesis richiede che il processore di record tenga traccia dei record che sono già stati elaborati in una partizione. Si KCL occupa di questo tracciamento per te passando un Checkpointer oggetto a ProcessRecords (input.Checkpointer). Il processore KCL di registrazione chiama il Checkpointer.Checkpoint metodo per informarli sui progressi compiuti nell'elaborazione dei record nello shard. Se l'operatore fallisce, KCL utilizza queste informazioni per riavviare l'elaborazione dello shard sull'ultimo record elaborato conosciuto.

Per un'operazione di divisione o unione, KCL non avvia l'elaborazione dei nuovi shard finché i processori degli shard originali non hanno chiamato per Checkpointer.Checkpoint segnalare che tutta l'elaborazione sugli shard originali è completa.

Se non si passa un parametro, KCL si presuppone che la chiamata a indichi che Checkpointer.Checkpoint tutti i record sono stati elaborati, fino all'ultimo record passato al registratore. Pertanto, il processore di record deve chiamare Checkpointer.Checkpoint solo dopo aver elaborato tutti i record nell'elenco passato al processore. I processori di record non devono chiamare Checkpointer.Checkpoint in ciascuna chiamata a ProcessRecords. Un processore potrebbe, per esempio, chiamare Checkpointer.Checkpoint in ogni terza o quarta chiamata. Puoi specificare, in modo facoltativo, il numero di sequenza esatto di un record come parametro per Checkpointer.Checkpoint. In questo caso, KCL si presuppone che i record siano stati elaborati solo fino a quel record.

Nell'esempio, il metodo privato Checkpoint(Checkpointer checkpointer) mostra come effettuare la chiamata al metodo Checkpointer.Checkpoint utilizzando la gestione delle eccezioni e la logica dei nuovi tentativi appropriate.

La KCL per. NETgestisce le eccezioni in modo diverso rispetto alle altre librerie di KCL lingue in quanto non gestisce le eccezioni derivanti dall'elaborazione dei record di dati. Le eccezioni non rilevate dal codice dell'utente determinano l'arresto del programma.

Arresto

KCLRichiama il Shutdown metodo quando l'elaborazione termina (il motivo della chiusura èTERMINATE) o quando l'operatore non risponde più (il valore di spegnimento è). input.Reason ZOMBIE

public void Shutdown(ShutdownInput input)

L'elaborazione termina quando il processore di record non riceve ulteriori record dallo shard, perché lo shard è stato frazionato o fuso o perché il flusso è stato eliminato.

Passa KCL anche un oggetto a. Checkpointer shutdown Se il motivo dell'arresto è TERMINATE, il processore di record deve terminare l'elaborazione di qualsiasi record di dati e, di seguito, chiamare il metodo checkpoint in questa interfaccia.

Modificare le proprietà di configurazione

Il consumer di esempio fornisce valori di default per le proprietà di configurazione. È possibile sostituire una qualsiasi di queste proprietà con i tuoi valori (consulta SampleConsumer/kcl.properties).

Nome applicazione

KCLRichiede un'applicazione che sia unica tra le tue applicazioni e tra le tabelle Amazon DynamoDB nella stessa regione. La biblioteca utilizza il valore di configurazione del nome dell'applicazione nei seguenti modi:

  • Si suppone che tutti i lavoratori associati con questo nome dell'applicazione stiano lavorando insieme nello stesso flusso. Questi lavoratori potrebbero essere distribuiti su più istanze. Se esegui un'istanza aggiuntiva dello stesso codice applicativo, ma con un nome di applicazione diverso, KCL considera la seconda istanza come un'applicazione completamente separata che opera anch'essa sullo stesso flusso.

  • KCLCrea una tabella DynamoDB con il nome dell'applicazione e utilizza la tabella per conservare le informazioni sullo stato (come i checkpoint e la mappatura dei worker-shard) per l'applicazione. Ogni applicazione ha la propria tabella DynamoDB. Per ulteriori informazioni, consulta Utilizzate una tabella di leasing per tenere traccia degli shard elaborati dall'applicazione consumer KCL.

Configura le credenziali

È necessario rendere disponibili le AWS credenziali a uno dei provider di credenziali della catena di provider di credenziali predefinita. Puoi utilizzare la proprietà AWSCredentialsProvider per impostare un provider di credenziali. Le proprietà di esempio devono rendere le tue credenziali disponibili per uno dei provider di credenziali nella catena di provider di credenziali di default. Se stai eseguendo l'applicazione consumer su un'EC2istanza, ti consigliamo di configurare l'istanza con un ruolo. IAM AWS le credenziali che riflettono le autorizzazioni associate a questo IAM ruolo vengono rese disponibili alle applicazioni sull'istanza tramite i relativi metadati dell'istanza. Questo è il modo più sicuro per gestire le credenziali per un consumatore che esegue un'istanza. EC2

Il file delle proprietà dell'esempio si configura KCL per elaborare un flusso di dati Kinesis chiamato «words» utilizzando il processore di registrazione fornito in dotazione. AmazonKinesisSampleConsumer.cs