Sviluppa un utente della Kinesis Client Library in Python - Flusso di dati Amazon Kinesis

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Sviluppa un utente della Kinesis Client Library in Python

È possibile utilizzare Kinesis Client Library (KCL) per creare applicazioni che elaborano i dati dai flussi di dati Kinesis. La Kinesis Client Library è disponibile in più linguaggi. In questo argomento viene discusso Python.

KCLÈ una libreria Java; il supporto per linguaggi diversi da Java viene fornito utilizzando un'interfaccia multilingue chiamata. MultiLangDaemon Questo demone è basato su Java e viene eseguito in background quando si utilizza un KCL linguaggio diverso da Java. Pertanto, se installi KCL for Python e scrivi la tua app consumer interamente in Python, avrai comunque bisogno che Java sia installato sul tuo sistema a causa di. MultiLangDaemon Inoltre, MultiLangDaemon ha alcune impostazioni predefinite che potresti dover personalizzare in base al tuo caso d'uso, ad esempio, la AWS regione a cui si connette. Per ulteriori informazioni su MultiLangDaemon on GitHub, vai alla pagina del KCL MultiLangDaemon progetto.

Per scaricare Python KCL da GitHub, vai alla Kinesis Client Library (Python). Per scaricare il codice di esempio per un'applicazione KCL consumer Python, vai alla pagina del progetto di esempio per KCL Python su. GitHub

È necessario completare le seguenti attività quando si implementa un'applicazione KCL consumer in Python:

Implementa i metodi della RecordProcessor classe

La classe RecordProcess deve estendere la RecordProcessorBase per implementare i seguenti metodi. L'esempio fornisce implementazioni che è possibile utilizzare come punto di partenza (consulta sample_kclpy_app.py).

def initialize(self, shard_id) def process_records(self, records, checkpointer) def shutdown(self, checkpointer, reason)
initialize

KCLChiama il initialize metodo quando viene creata un'istanza del processore di registrazione, passando uno shard ID specifico come parametro. Questo processore di record elabora esclusivamente questo shard e, in genere, è vero anche il contrario (questo shard è elaborato solo da questo processore di record). Tuttavia, il tuo consumer deve tenere conto della possibilità che un record di dati possa essere elaborato più di una volta. Ciò si verifica perché il flusso di dati Kinesis ha una semantica almeno una volta, il che significa che ogni record di dati da una partizione viene elaborato almeno una volta da un worker nel tuo consumer. Per ulteriori informazioni sui casi in cui un determinato shard può essere elaborato da più di un lavoratore, consulta Usa il resharding, lo scaling e l'elaborazione parallela per modificare il numero di shard.

def initialize(self, shard_id)
process_records

KCLChiama questo metodo, passando un elenco di record di dati dallo shard specificato dal metodo. initialize Il processore di record che implementi elabora i dati in questi record in base alla semantica del tuo consumer. Ad esempio, il worker potrebbe eseguire una trasformazione dei dati e, successivamente, archiviare il risultato in un bucket Amazon Simple Storage Service (Amazon S3).

def process_records(self, records, checkpointer)

Oltre ai dati stessi, il record contiene anche un numero di sequenza e una chiave di partizione. Il lavoratore può utilizzare questi valori quando elabora i dati. Ad esempio, il lavoratore può scegliere il bucket S3 in cui archiviare i dati in base al valore della chiave di partizione. Il dizionario record espone le seguenti coppie chiave-valore per accedere ai dati del record, al numero di sequenza e alla chiave di partizione:

record.get('data') record.get('sequenceNumber') record.get('partitionKey')

Tieni presente che i dati sono codificati in Base64.

Nell'esempio, il metodo process_records ha un codice che mostra in che modo un lavoratore può accedere ai dati del record, al numero di sequenza e alla chiave di partizione.

Il flusso di dati Kinesis richiede che il processore di record tenga traccia dei record che sono già stati elaborati in una partizione. KCLSi occupa di questo tracciamento per te passando un Checkpointer oggetto aprocess_records. Il processore di registrazione chiama il checkpoint metodo su questo oggetto per informarlo KCL dei progressi compiuti nell'elaborazione dei record nello shard. Se l'operatore fallisce, KCL utilizza queste informazioni per riavviare l'elaborazione dello shard sull'ultimo record elaborato conosciuto.

Per un'operazione di divisione o unione, KCL non avvia l'elaborazione dei nuovi shard finché i processori degli shard originali non hanno chiamato per checkpoint segnalare che tutta l'elaborazione sugli shard originali è completa.

Se non si passa un parametro, KCL si presuppone che la chiamata a checkpoint indichi che tutti i record sono stati elaborati, fino all'ultimo record passato al registratore. Pertanto, il processore di record deve chiamare checkpoint solo dopo aver elaborato tutti i record nell'elenco passato al processore. I processori di record non devono chiamare checkpoint in ciascuna chiamata a process_records. Un processore potrebbe, per esempio, chiamare checkpoint in ogni terza chiamata. Puoi specificare, in modo facoltativo, il numero di sequenza esatto di un record come parametro per checkpoint. In questo caso, si KCL presuppone che tutti i record siano stati elaborati solo fino a quel record.

Nell'esempio, il metodo privato checkpoint mostra come effettuare la chiamata al metodo Checkpointer.checkpoint utilizzando la gestione delle eccezioni e la logica dei nuovi tentativi appropriate.

Si KCL affida process_records alla gestione di eventuali eccezioni derivanti dall'elaborazione dei record di dati. Se viene generata un'eccezione daprocess_records, KCL salta i record di dati passati prima dell'eccezione. process_records Ciò significa che questi record non sono inviati nuovamente al processore di record che ha generato l'eccezione o a qualsiasi altro processore di record nel consumer.

shutdown

KCLRichiama il shutdown metodo quando l'elaborazione termina (il motivo è l'arrestoTERMINATE) o l'operatore non risponde più (lo è l'arresto). reason ZOMBIE

def shutdown(self, checkpointer, reason)

L'elaborazione termina quando il processore di record non riceve ulteriori record dallo shard, perché lo shard è stato frazionato o fuso o perché il flusso è stato eliminato.

Passa KCL anche un oggetto a. Checkpointer shutdown Se il reason dell'arresto è TERMINATE, il processore di record deve terminare l'elaborazione di qualsiasi record di dati e, di seguito, chiamare il metodo checkpoint in questa interfaccia.

Modificare le proprietà di configurazione

L'esempio fornisce valori di default per le proprietà di configurazione. È possibile sostituire una qualsiasi di queste proprietà con i tuoi valori (consulta sample.properties).

Nome applicazione

KCLRichiede un nome di applicazione che sia unico tra le tue applicazioni e tra le tabelle Amazon DynamoDB nella stessa regione. La biblioteca utilizza il valore di configurazione del nome dell'applicazione nei seguenti modi:

  • Si suppone che tutti i lavoratori associati con questo nome dell'applicazione stiano lavorando insieme nello stesso flusso. Questi lavoratori possono essere distribuiti su più istanze. Se esegui un'istanza aggiuntiva dello stesso codice applicativo, ma con un nome di applicazione diverso, KCL considera la seconda istanza come un'applicazione completamente separata che opera anch'essa sullo stesso flusso.

  • KCLCrea una tabella DynamoDB con il nome dell'applicazione e utilizza la tabella per conservare le informazioni sullo stato (come i checkpoint e la mappatura dei worker-shard) per l'applicazione. Ogni applicazione ha la propria tabella DynamoDB. Per ulteriori informazioni, consulta Utilizzate una tabella di leasing per tenere traccia degli shard elaborati dall'applicazione consumer KCL.

Configura le credenziali

È necessario rendere disponibili le AWS credenziali a uno dei provider di credenziali della catena di provider di credenziali predefinita. Puoi utilizzare la proprietà AWSCredentialsProvider per impostare un provider di credenziali. Le proprietà di esempio devono rendere le tue credenziali disponibili per uno dei provider di credenziali nella catena di provider di credenziali di default. Se esegui la tua applicazione consumer su un'EC2istanza Amazon, ti consigliamo di configurare l'istanza con un IAM ruolo. AWS le credenziali che riflettono le autorizzazioni associate a questo IAM ruolo vengono rese disponibili alle applicazioni sull'istanza tramite i relativi metadati dell'istanza. Questo è il modo più sicuro per gestire le credenziali per un'applicazione consumer in esecuzione su un'istanza. EC2

Il file delle proprietà dell'esempio si configura KCL per elaborare un flusso di dati Kinesis chiamato «words» utilizzando il processore di registrazione fornito in dotazione. sample_kclpy_app.py