

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Desenvolver uma aplicação de consumo da Kinesis Client Library em Python
<a name="kinesis-record-processor-implementation-app-py"></a>

**Importante**  
As versões 1.x e 2.x da Amazon Kinesis Client Library (KCL) estão desatualizadas. O KCL 1.x chegará end-of-support em 30 de janeiro de 2026. É **altamente recomendável** que você migre suas aplicações da KCL que usam a versão 1.x para a versão mais recente da KCL antes de 30 de janeiro de 2026. Para encontrar a versão mais recente da KCL, consulte a página da [biblioteca de cliente do Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) em. GitHub Para obter informações sobre as versões mais recentes da KCL, consulte [Usar a Kinesis Client Library](kcl.md). Para ter mais informações sobre como migrar da KCL 1.x para a KCL 3.x, consulte [Migrar da KCL 1.x para a KCL 3.x](kcl-migration-1-3.md).

É possível usar a Kinesis Client Library (KCL) para criar aplicações que processam dados dos fluxos de dados do Kinesis. A Kinesis Client Library está disponível em várias linguagens. Este tópico discute Python.

A KCL é uma biblioteca Java; o suporte para linguagens diferentes de Java é fornecido usando uma interface multilíngue chamada de. *MultiLangDaemon* Esse daemon baseado em Java é executado em segundo plano quando uma linguagem de KCL diferente de Java é utilizada. Portanto, se você instalar o KCL para Python e escrever seu aplicativo de consumidor inteiramente em Python, ainda precisará do Java instalado em seu sistema por causa do. MultiLangDaemon Além disso, MultiLangDaemon tem algumas configurações padrão que você pode precisar personalizar para seu caso de uso, por exemplo, a AWS região à qual ele se conecta. Para obter mais informações sobre o MultiLangDaemon on GitHub, acesse a página do [ MultiLangDaemon projeto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Para baixar o Python KCL em GitHub, acesse a [Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client-python) (Python). Para baixar o código de amostra para um aplicativo de consumidor do Python KCL, acesse a página do projeto de amostra do [KCL for Python](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples) em. GitHub

É necessário concluir as seguintes tarefas ao implementar uma aplicação de consumo da KCL em Python:

**Topics**
+ [Implemente os métodos RecordProcessor de classe](#kinesis-record-processor-implementation-interface-py)
+ [Modificar as propriedades de configuração](#kinesis-record-processor-initialization-py)

## Implemente os métodos RecordProcessor de classe
<a name="kinesis-record-processor-implementation-interface-py"></a>

A classe `RecordProcess` precisa estender o `RecordProcessorBase` para implementar os métodos a seguir. O exemplo fornece implementações que podem ser usadas como ponto de partida (consulte `sample_kclpy_app.py`).

```
def initialize(self, shard_id)
def process_records(self, records, checkpointer)
def shutdown(self, checkpointer, reason)
```

**inicializar**  
A KCL chama o método `initialize` quando o processador de registros é instanciado, passando um ID de fragmento específico como um parâmetro. Esse processador de registros processa apenas esse fragmento e, normalmente, o inverso também é verdadeiro (esse fragmento é processado somente por esse processador de registro). No entanto, a aplicação de consumo deve considerar a possibilidade de que um registro de dados pode ser processado mais de uma vez. Isso acontece porque a semântica do Kinesis Data Streams é do tipo *pelo menos uma vez*, o que significa que cada registro de dados de um fragmento é processado pelo menos uma vez por um operador no consumidor. Para obter mais informações sobre casos em que um fragmento específico pode ser processado por mais de um operador, consulte [Use refragmentação, escalonamento e processamento paralelo para alterar o número de fragmentos](kinesis-record-processor-scaling.md).

```
def initialize(self, shard_id)
```

**process\$1records**  
 A KCL chama este método passando uma lista de registros de dados do fragmento especificado pelo método `initialize`. O processador de registros implementado processará os dados nesses registros de acordo com a semântica da aplicação de consumo. Por exemplo, o operador pode executar uma transformação nos dados e, em seguida, armazenar o resultado em um bucket do Amazon Simple Storage Service (Amazon S3).

```
def process_records(self, records, checkpointer) 
```

Além dos dados em si, o registro também contém um número de sequência e uma chave de partição. O operador pode usar esses valores ao processar os dados. Por exemplo, o operador pode escolher o bucket do S3 no qual armazenar os dados com base no valor da chave de partição. O dicionário de `record` expõe os seguintes pares de chave/valor para acessar os dados do registro, o número de sequência e a chave de partição:

```
record.get('data')
record.get('sequenceNumber')
record.get('partitionKey')
```

Observe que os dados são codificados em Base64.

No exemplo, o método `process_records` tem código que mostra como um operador pode acessar os dados do registro, o número de sequência e a chave de partição.

O Kinesis Data Streams requer que o processador de registros rastreie os registros que já foram processados em um fragmento. A KCL faz esse rastreamento, passando um objeto `Checkpointer` para `process_records`. O processador de registros chama o método `checkpoint` neste objeto para informar a KCL sobre o progresso do processamento dos registros no fragmento. Se o operador falhar, a KCL usará essas informações para reiniciar o processamento do fragmento no último registro processado conhecido.

Em uma operação de divisão ou mesclagem, a KCL só começará a processar os novos fragmentos quando os processadores dos fragmentos originais chamarem `checkpoint` para indicar que o processamento dos fragmentos originais foi concluído.

Se você não passar um parâmetro, a KCL presumirá que a chamada para `checkpoint` significa que todos os registros foram processados até o último registro passado para o processador de registros. Portanto, o processador de registros deve chamar `checkpoint` somente após ter processado todos os registros na lista que foi passada a ele. Os processadores de registros não precisam chamar `checkpoint` em cada chamada para `process_records`. Um processador pode, por exemplo, chamar `checkpoint` a cada terceira chamada. É possível, opcionalmente, especificar o número de sequência exato de um registro como um parâmetro para `checkpoint`. Nesse caso, a KCL presume que todos os registros foram processados somente até o registro especificado.

No exemplo, o método privado `checkpoint` mostra como chamar o método `Checkpointer.checkpoint` usando a lógica de novas tentativas e o tratamento de exceções apropriados.

A KCL depende do `process_records` para lidar com qualquer exceção ocorrida no processamento dos registros de dados. Se ocorrer uma exceção em `process_records`, a KCL ignorará os registros de dados passados para `process_records` antes da exceção. Ou seja, esses registros não serão reenviados para o processador de registros que lançou a exceção ou para qualquer outro processador de registros na aplicação de consumo.

**shutdown**  
 A KCL chama o método `shutdown` quando o processamento termina (o motivo do desligamento é `TERMINATE`) ou quando o operador não está mais respondendo (o `reason` do desligamento é `ZOMBIE`).

```
def shutdown(self, checkpointer, reason)
```

O processamento termina quando o processador de registros não recebe mais registros do fragmento porque ele foi dividido ou intercalado, ou o fluxo foi excluído.

 A KCL também passa um objeto `Checkpointer` para `shutdown`. Se o `reason` do desligamento é `TERMINATE`, o processador de registros deve terminar o processamento de todos os registros de dados e, em seguida, chamar o método `checkpoint` nesta interface.

## Modificar as propriedades de configuração
<a name="kinesis-record-processor-initialization-py"></a>

O exemplo fornece valores padrão para as propriedades de configuração. É possível substituir qualquer uma dessas propriedades por seus próprios valores (consulte `sample.properties`).

### Nome da aplicação
<a name="kinesis-record-processor-application-name-py"></a>

A KCL exige um nome de aplicação exclusivo entre as aplicações e as tabelas do Amazon DynamoDB na mesma região. Ela usa o valor de configuração de nome de aplicativo das seguintes formas:
+ Presume-se que todos os operadores associados a esse nome de aplicativo estejam trabalhando juntos no mesmo fluxo. Esses operadores podem ser distribuídos em várias instâncias. Se você executar uma instância adicional do mesmo código da aplicação, mas com um nome diferente, a KCL tratará a segunda instância como uma aplicação totalmente independente operando no mesmo fluxo.
+ A KCL cria uma tabela do DynamoDB com o nome da aplicação e usa essa tabela para manter informações de estado (como pontos de verificação e mapeamento de operador-fragmento) da aplicação. Cada aplicação tem sua própria tabela do DynamoDB. Para obter mais informações, consulte [Usar uma tabela de concessões para monitorar os fragmentos processados pela aplicação de consumo da KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Configurar credenciais
<a name="kinesis-record-processor-creds-py"></a>

Você deve disponibilizar suas AWS credenciais para um dos provedores de credenciais na cadeia de provedores de credenciais padrão. A propriedade `AWSCredentialsProvider` pode ser usada para definir um provedor de credenciais. As [sample.properties](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) precisam disponibilizar as credenciais para um dos provedores de credenciais na [cadeia de provedores de credenciais padrão](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Ao executar a aplicação de consumo em uma instância do Amazon EC2, recomenda-se que a instância seja configurada com um perfil do IAM. As credenciais da AWS que refletem as permissões associadas a esse perfil do IAM são disponibilizadas às aplicações na instância por meio dos metadados da instância. Essa é a maneira mais segura de gerenciar credenciais para uma aplicação de consumo em execução em uma instância do EC2.

O arquivo de propriedades do exemplo configura a KCL para processar um fluxo de dados do Kinesis chamado “words” usando o processador de registros fornecido em `sample_kclpy_app.py`. 