

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Usar a Kinesis Client Library
<a name="kcl"></a>

## O que é a Kinesis Client Library?
<a name="kcl-library-what-is"></a>

A Kinesis Client Library (KCL) é uma biblioteca de software Java independente projetada para simplificar o processo de consumo e processamento de dados do Amazon Kinesis Data Streams. A KCL lida com muitas das tarefas complexas associadas à computação distribuída, o que permite que os desenvolvedores se concentrem na implementação da lógica de negócios para o processamento dos dados. Ela gerencia atividades como balanceamento de carga entre vários operadores, resposta a falhas de operadores, verificação dos registros processados e resposta a mudanças no número de fragmentos no fluxo.

A KCL é atualizada com frequência para incorporar as versões mais recentes das bibliotecas subjacentes, promover melhorias de segurança e realizar correções de bugs. É recomendável que você use a versão mais recente da KCL para evitar os problemas já conhecidos e se beneficiar de todas as melhorias mais recentes. Para encontrar a versão mais recente da KCL, consulte [Github da KCL](https://github.com/awslabs/amazon-kinesis-client). 

**Importante**  
É recomendável que você use a versão mais recente da KCL para evitar os bugs e problemas já conhecidos. Se você estiver usando a KCL 2.6.0 ou versão anterior, atualize para a KCL 2.6.1 ou versão posterior para evitar uma condição rara que pode bloquear o processamento de fragmentos quando a capacidade do fluxo for alterada. 
A KCL é uma biblioteca Java. Support para linguagens diferentes de Java é fornecido usando um daemon baseado em Java chamado. MultiLangDaemon MultiLangDaemoninterage com o aplicativo KCL por meio de STDIN e STDOUT. Para obter mais informações sobre o MultiLangDaemon on GitHub, consulte[Desenvolva consumidores com a KCL em linguagens não Java](develop-kcl-consumers-non-java.md).
Não use as AWS SDK para Java versões 2.27.19 a 2.27.23 com KCL 3.x. Essas versões têm um problema que causa um erro de exceção relacionado ao uso do DynamoDB da KCL. Recomendamos que você use a AWS SDK para Java versão 2.28.0 ou posterior para evitar esse problema. 

## Principais atributos e benefícios da KCL
<a name="kcl-benefits"></a>

A seguir, alguns dos principais atributos e benefícios correspondentes da KCL:
+ **Escalabilidade**: a KCL permite que as aplicações sejam escaladas dinamicamente distribuindo a carga de processamento entre vários operadores. Você pode escalar sua aplicação, removendo ou expandindo manualmente ou com ajuste de escala automático, sem se preocupar com a redistribuição de carga.
+ **Balanceamento de carga**: a KCL equilibra automaticamente a carga de processamento entre os operadores disponíveis, resultando em uma distribuição uniforme do trabalho entre os operadores.
+ **Ponto de verificação**: a KCL gerencia o ponto de verificação dos registros processados, permitindo que as aplicações retomem o processamento a partir da última posição processada com êxito.
+ **Tolerância a falhas**: a KCL fornece mecanismos integrados de tolerância a falhas, garantindo que o processamento de dados continue, mesmo que operadores individuais falhem. A KCL também fornece at-least-once entrega.
+ **Tratamento de mudanças no fluxo**: a KCL se adapta às divisões e mesclagens de fragmentos que podem ocorrer devido a alterações no volume de dados. Ela mantém o ordenamento ao assegurar que os fragmentos secundários sejam processados somente após o fragmento principal ser concluído e verificado.
+ **Monitoramento**: a KCL se integra à Amazon CloudWatch para monitoramento em nível de consumidor.
+ **Suporte a vários idiomas**: o KCL oferece suporte nativo a Java e habilita várias linguagens de programação não Java. MultiLangDaemon

# Conceitos da KCL
<a name="kcl-concepts"></a>

Essa seção explica os principais conceitos e interações da Kinesis Client Library (KCL). Esses conceitos são fundamentais para desenvolver e gerenciar aplicações consumidoras da KCL.
+ **Aplicação de consumo da KCL**: uma aplicação personalizada projetada para ler e processar os registros dos fluxos de dados do Kinesis usando a Kinesis Client Library.
+ **Operador**: as aplicações consumidoras da KCL são distribuídas de modo típico, com um ou mais operadores em execução simultânea. A KCL coordena os operadores para que consumam os dados do fluxo de forma distribuída e equilibra a carga uniformemente entre vários operadores.
+ **Agendador**: uma classe de alto nível que um operador da KCL usa para começar a processar os dados. Cada operador da KCL tem um agendador. O agendador inicializa e supervisiona várias tarefas, incluindo a sincronização de informações de fragmentos dos fluxos de dados do Kinesis, o monitoramento de atribuições de fragmentos entre os operadores e o processamento dos dados do fluxo com base nos fragmentos atribuídos ao operador. O agendador pode usar várias configurações que afetam seu comportamento, como o nome do fluxo a ser processado e as credenciais da AWS . O agendador inicia a entrega dos registros de dados do fluxo para os processadores de registros.
+ **Processador de registros**: define a lógica pela qual a aplicação de consumo da KCL processa os dados obtidos dos fluxos de dados. É necessário implementar sua lógica de processamento de dados personalizada no processador de registros. Um operador da KCL instancia um agendador. Em seguida, o agendador instancia um processador de registros para cada fragmento para o qual tenha concessão. Um operador pode executar vários processadores de registros.
+ **Concessão**: define a atribuição entre um operador e um fragmento. As aplicações consumidoras da KCL usam concessões para distribuir o processamento de registros de dados por vários operadores. Cada fragmento está vinculado a apenas um operador por meio de uma concessão em um determinado momento e cada operador pode manter um ou mais concessões simultaneamente. Quando um operador deixa de manter uma concessão devido a interrupção ou falha, a KCL atribui outro operador para assumir a concessão. Para mais detalhes sobre a concessão, consulte [Documentação do Github: ciclo de vida da concessão](https://github.com/awslabs/amazon-kinesis-client/blob/master/docs/lease-lifecycle.md#lease-lifecycle).
+ **Tabela de concessão**: é uma tabela exclusiva do Amazon DynamoDB usada para rastrear todas as concessões da aplicação de consumo da KCL. Cada aplicação de consumo da KCL cria sua própria tabela de concessões. A tabela de concessões é usada para manter o estado de todos os operadores para coordenar o processamento de dados. Para obter mais informações, consulte [Tabelas de metadados do DynamoDB e balanceamento de carga na KCL](kcl-dynamoDB.md).
+ **Ponto de verificação**: é o processo de armazenar persistentemente a posição do último registro processado com sucesso em um fragmento. A KCL gerencia o ponto de verificação para garantir que o processamento possa ser retomado a partir da última posição desse ponto se um operador falhar ou a aplicação for reiniciada. Os pontos de verificação são armazenados na tabela de concessão do DynamoDB como parte dos metadados da concessão. Isso permite que os operadores continuem o processamento a partir do ponto em que o operador anterior parou.

# Tabelas de metadados do DynamoDB e balanceamento de carga na KCL
<a name="kcl-dynamoDB"></a>

A KCL gerencia metadados, como concessões e métricas de utilização da CPU dos operadores. A KCL rastreia esses metadados usando as tabelas do DynamoDB. Para cada aplicação do Amazon Kinesis Data Streams, a KCL cria três tabelas do DynamoDB para gerenciar os metadados: tabela de concessão, tabela de métricas do operador e tabela de estado do coordenador.

**nota**  
A KCL 3.x introduziu duas novas tabelas de metadados: *métricas do operador* e tabelas de *estado do coordenador*.

**Importante**  
 Adicione as permissões adequadas às aplicações da KCL para criar e gerenciar tabelas de metadados no DynamoDB. Para obter detalhes, consulte [Permissões do IAM necessárias para aplicativos de consumo da KCL](kcl-iam-permissions.md).  
O aplicação de consumo da KCL não remove automaticamente essas três tabelas de metadados do DynamoDB. Remova as tabelas de metadados do DynamoDB criadas pela aplicação de consumo da KCL ao descomissionar sua aplicação de consumo para evitar custos desnecessários.

## Tabela de concessões
<a name="kcl-leasetable"></a>

A tabela de concessões é uma tabela exclusiva do Amazon DynamoDB usada para monitorar os fragmentos cedidos e processados pelos agendadores da aplicação de consumo da KCL. Cada aplicação de consumo da KCL cria sua própria tabela de concessões. A KCL usa o nome da aplicação de consumo como nome da tabela de concessões por padrão. Você pode definir um nome de tabela personalizado usando a configuração. A KCL também cria um [índice secundário global](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GSI.html) na tabela de concessões com a chave de partição do leaseOwner para uma descoberta eficiente da concessão. O índice secundário global reflete o atributo leaseKey da tabela básica de concessões. Se a tabela de concessões da aplicação de consumo da KCL não existir quando a aplicação for inicializada, um dos operadores a criará.

É possível visualizar a tabela usando o [console do Amazon DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ConsoleDynamoDB.html) enquanto a aplicação de consumo está em execução.

**Importante**  
Cada nome de aplicação de consumo da KCL deve ser exclusivo para evitar a duplicação do nome da tabela de concessões. 
Sua conta é cobrada pelos custos associados à tabela do DynamoDB, além dos custos associados ao próprio Kinesis Data Streams. 

Cada linha na tabela de concessões representa um fragmento que está sendo processado pelos agendadores da aplicação de consumo. Os campos principais incluem o seguinte:
+ **leaseKey**: para processamento de fluxo único, essa é a ID do fragmento. Para o processamento de vários fluxos com a KCL, ele é estruturado como `account-id:StreamName:streamCreationTimestamp:ShardId`. A leaseKey é a chave de partição da tabela de concessões. Para obter mais informações sobre o processamento de vários fluxos, consulte [Processamento de vários fluxos com a KCL](kcl-multi-stream.md).
+ **checkpoint:** número de sequência do ponto de verificação mais recente do fragmento. 
+ **checkpointSubSequenceNúmero:** ao usar o recurso de agregação da Kinesis Producer Library, essa é uma extensão do **ponto de verificação** que rastreia registros individuais de usuários dentro do registro do Kinesis.
+ **leaseCounter**: usado para verificar se um operador está processando atualmente a concessão de modo ativo. O leaseCounter aumenta se a propriedade da concessão for transferida para outro operador.
+ **leaseOwner:** é o operador atual que detém essa concessão.
+ **ownerSwitchesSincePonto de controle:** Quantas vezes esse contrato mudou de trabalhadores desde o último posto de controle.
+ **parentShardId:** ID do pai desse fragmento. Garante que o fragmento principal seja totalmente processado antes do início do processamento dos fragmentos secundários, mantendo a ordem correta de processamento do registro.
+ **childShardId:** Lista de fragmentos secundários IDs resultantes da divisão ou mesclagem desse fragmento. Usado para rastrear a linhagem de fragmentos e gerenciar a ordem de processamento durante as operações de refragmentação.
+ **startingHashKey:** o limite inferior do intervalo de chaves de hash desse fragmento.
+ **endingHashKey:** o limite superior do intervalo de chaves de hash desse fragmento.

Se adotar o processamento de vários fluxos com a KCL, você visualizará os dois campos adicionais a seguir na tabela de concessões. Para obter mais informações, consulte [Processamento de vários fluxos com a KCL](kcl-multi-stream.md).
+ **shardID:** o ID do fragmento.
+ **streamName:** é o identificador do fluxo de dados no formato: `account-id:StreamName:streamCreationTimestamp`.

## Tabela de métricas do operador
<a name="kcl-worker-metrics-table"></a>

A tabela de métricas do operador é uma tabela exclusiva do Amazon DynamoDB para cada aplicação da KCL e é usada para registrar as métricas de utilização da CPU de cada operador. Essas métricas serão usadas pela KCL para atribuir concessões de modo eficiente, resultando no uso equilibrado dos recursos entre os operadores. A KCL usa `KCLApplicationName-WorkerMetricStats` como nome da tabela de métricas do operador por padrão.

## Tabela de estados do coordenador
<a name="kcl-coordinator-state-table"></a>

A tabela de estados do coordenador é uma tabela exclusiva do Amazon DynamoDB para cada aplicação da KCL e é usada para armazenar informações de estado internas dos operadores. Por exemplo, a tabela de estados do coordenador armazena dados sobre a eleição do líder ou os metadados associados à migração local da KCL 2.x para a KCL 3.x. A KCL usa `KCLApplicationName-CoordinatorState` como nome da tabela de estados do coordenador por padrão.

## Modo de capacidade do DynamoDB para tabelas de metadados criadas pela KCL
<a name="kcl-capacity-mode"></a>

Por padrão, a Kinesis Client Library (KCL) cria tabelas de metadados do DynamoDB, como tabela de concessões, tabela de métricas de operadores e tabela de estados do coordenador usando o [modo de capacidade sob demanda](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/on-demand-capacity-mode.html). Esse modo dimensiona automaticamente a capacidade de leitura e gravação para acomodar o tráfego sem exigir planejamento de capacidade. É altamente recomendável que você mantenha o modo de capacidade como modo sob demanda para uma operação mais eficiente dessas tabelas de metadados.

Se você decidir mudar a tabela de concessão para o [modo de capacidade provisionada](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/provisioned-capacity-mode.html), siga estas práticas recomendadas:
+ Analise os padrões de uso:
  + Monitore os padrões e usos de leitura e gravação do seu aplicativo (RCU, WCU) usando métricas da Amazon. CloudWatch 
  + Entenda os requisitos de pico e médios de throughput.
+ Calcule a capacidade necessária:
  + Estime as unidades de capacidade de leitura (RCUs) e as unidades de capacidade de gravação (WCUs) com base em sua análise.
  + Considere fatores como o número de fragmentos, a frequência dos pontos de verificação e o número de operadores.
+ Implemente o ajuste de escala automático:
  + use o [ajuste de escala automático do DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/provisioned-capacity-mode.html#ddb-autoscaling) para ajustar automaticamente a capacidade provisionada e definir os limites de capacidade mínima e máxima apropriados. 
  + O ajuste de escala automático do DynamoDB ajudará a evitar que a tabela de metadados da KCL atinja o limite de capacidade e a utilização passe a ser controlada.
+ Monitoramento e otimização regulares:
  + Monitore continuamente CloudWatch as métricas de`ThrottledRequests`.
  + Ajuste a capacidade à medida que sua workload for alterada ao longo do tempo.

Se você tiver `ProvisionedThroughputExceededException` nas tabelas de metadados do DynamoDB para sua aplicação de consumo da KCL, você precisará aumentar a capacidade de throughput provisionada da tabela do DynamoDB. Se você definir um determinado nível de unidades de capacidade de leitura (RCU) e de unidades de capacidade de gravação (WCU) na primeira vez que criar sua aplicação de consumo, esse nível pode não ser suficiente à medida que seu uso aumentar. Por exemplo, se uma aplicação de consumo da KCL realiza pontos de verificação com frequência ou opera em um fluxo com vários fragmentos, talvez sejam necessárias unidades com capacidade maior. Para obter informações sobre throughput provisionada no DynamoDB, consulte a [capacidade de throughput do DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/capacity-mode.html) e [atualização de uma tabela](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithTables.Basics.html#WorkingWithTables.Basics.UpdateTable) no Guia do desenvolvedor do Amazon DynamoDB.

## Como a KCL atribui concessões aos operadores e equilibra a carga
<a name="kcl-assign-leases"></a>

A KCL coleta e monitora continuamente as métricas de utilização da CPU dos hosts de computação que executam os operadores para garantir uma distribuição uniforme de workload. Essas métricas de utilização da CPU são armazenadas na tabela de métricas do operador no DynamoDB. Se a KCL detectar que alguns operadores apresentam taxas de utilização da CPU mais altas em comparação com outros, ela reatribuirá as concessões entre os operadores para reduzir a carga dos operadores mais usados. O objetivo é equilibrar a workload de forma mais uniforme em toda a frota de aplicações de consumo, evitando que algum dos operadores fique sobrecarregado. À medida que a KCL distribui a utilização da CPU em toda a frota de aplicações de consumo, você pode dimensionar corretamente a capacidade da frota da aplicação de consumo escolhendo o número certo de operadores ou usar o ajuste de escala automático para gerenciar com eficiência a capacidade computacional para obter custos mais baixos.

**Importante**  
A KCL pode coletar métricas de utilização da CPU dos operadores somente se determinados pré-requisitos forem atendidos. Para obter detalhes, consulte [Pré-requisitos](develop-kcl-consumers-java.md#develop-kcl-consumers-java-prerequisites). Se a KCL não conseguir coletar dos operadores as métricas de utilização da CPU, a KCL voltará a usar o throughput por operador para atribuir concessões e equilibrar a carga entre os operadores da frota. A KCL monitorará o throughput que cada operador recebe em um determinado momento e reatribuirá as concessões para garantir que cada operador receba um nível de throughput total semelhante das concessões atribuídas.

# Desenvolver consumidores com a KCL
<a name="develop-kcl-consumers"></a>

É possível usar a Kinesis Client Library (KCL) para criar aplicações de consumo que processam dados dos fluxos de dados do Kinesis.

A KCL está disponível em várias linguagens. Este tópico aborda como desenvolver consumidores da KCL em linguagens Java e não Java.
+ Para conhecer a referência em Javadoc da Kinesis Client Library, consulte [Javadoc da Amazon Kinesis Client Library](https://javadoc.io/doc/software.amazon.kinesis/amazon-kinesis-client/latest/index.html).
+ Para baixar o KCL para Java em GitHub, consulte a [Biblioteca de cliente do Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) para Java.
+ Para localizar a KCL para Java no Apache Maven, consulte o [Repositório central do Maven da KCL](https://central.sonatype.com/artifact/software.amazon.kinesis/amazon-kinesis-client).

**Topics**
+ [Desenvolver consumidores com a KCL em Java](develop-kcl-consumers-java.md)
+ [Desenvolva consumidores com a KCL em linguagens não Java](develop-kcl-consumers-non-java.md)

# Desenvolver consumidores com a KCL em Java
<a name="develop-kcl-consumers-java"></a>

## Pré-requisitos
<a name="develop-kcl-consumers-java-prerequisites"></a>

Antes de começar a usar a KCL 3.x, certifique-se de ter os pré-requisitos apresentados a seguir:
+ Java Development Kit (JDK) 8 ou posterior
+ AWS SDK para Java 2. x
+ Maven ou Gradle para gerenciamento de dependências

A KCL coleta métricas de utilização da CPU, como a utilização da CPU, do host de computação executado pelos operadores para equilibrar a carga e alcançar um nível uniforme de utilização de recursos entre os operadores. Para permitir que a KCL colete métricas de utilização da CPU dos operadores, é necessário que os seguintes pré-requisitos sejam atendidos:

 **Amazon Elastic Compute Cloud(Amazon EC2)**
+ Seu sistema operacional deve ser Linux OS.
+ Você deve habilitar [IMDSv2](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html)em sua instância do EC2.

 **Amazon Elastic Container Service (Amazon ECS) no Amazon EC2**
+ Seu sistema operacional deve ser Linux OS.
+ É necessário ativar o [endpoint de metadados de tarefas do ECS versão 4](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ec2-metadata.html). 
+ Sua versão do atendente do contêiner do Amazon ECS deve ser 1.39.0 ou posterior.

 **Amazon ECS em AWS Fargate**
+ É necessário habilitar o [endpoint de metadados de tarefas do Fargate versão 4](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4-fargate.html). Se você usar a versão da plataforma 1.4.0 ou posterior do Fargate, essa opção fica habilitada por padrão. 
+ Fargate versão da plataforma 1.4.0 ou posterior.

 **Amazon Elastic Kubernetes Service (Amazon EKS) no Amazon EC2** 
+ Seu sistema operacional deve ser Linux OS.

 **Amazon EKS em AWS Fargate**
+ Plataforma 1.3.0 ou posterior do Fargate.

**Importante**  
Se a KCL não conseguir coletar dos operadores as métricas de utilização da CPU, a KCL voltará a usar o throughput por operador para atribuir concessões e equilibrar a carga entre os operadores da frota. Para obter mais informações, consulte [Como a KCL atribui concessões aos operadores e equilibra a carga](kcl-dynamoDB.md#kcl-assign-leases).

## Instalar e adicionar dependências
<a name="develop-kcl-consumers-java-installation"></a>

Se estiver usando Maven, adicione a dependência a seguir ao seu arquivo `pom.xml`. Certifique-se de ter substituído 3.x.x pela versão mais recente da KCL. 

```
<dependency>
    <groupId>software.amazon.kinesis</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>3.x.x</version> <!-- Use the latest version -->
</dependency>
```

Se você estiver usando Gradle, adicione o seguinte ao seu arquivo `build.gradle`. Certifique-se de ter substituído 3.x.x pela versão mais recente da KCL. 

```
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
```

A versão mais recente da KCL pode ser obtida no [Repositório central do Maven](https://search.maven.org/artifact/software.amazon.kinesis/amazon-kinesis-client).

## Implementar o consumidor
<a name="develop-kcl-consumers-java-implemetation"></a>

Uma aplicação de consumo da KCL consiste nos seguintes componentes principais:

**Topics**
+ [RecordProcessor](#implementation-recordprocessor)
+ [RecordProcessorFactory](#implementation-recordprocessorfactory)
+ [Agendador](#implementation-scheduler)
+ [Aplicação de consumo principal](#implementation-main)

### RecordProcessor
<a name="implementation-recordprocessor"></a>

RecordProcessor é o componente principal em que reside sua lógica de negócios para processar registros de stream de dados do Kinesis. Ele define como sua aplicação processa os dados que recebe do fluxo do Kinesis.

Principais responsabilidades:
+ Inicializar o processamento de um fragmento
+ Processar lotes de registros do fluxo do Kinesis
+ Encerrar o processamento de um fragmento (por exemplo, quando o fragmento é dividido ou mesclado ou ainda quando a concessão é entregue a outro host)
+ Tratar do ponto de verificação para acompanhar o progresso

A seguir, um exemplo de implementação:

```
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.*;
import software.amazon.kinesis.processor.ShardRecordProcessor;

public class SampleRecordProcessor implements ShardRecordProcessor {
    private static final String SHARD_ID_MDC_KEY = "ShardId";
    private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);
    private String shardId;

    @Override
    public void initialize(InitializationInput initializationInput) {
        shardId = initializationInput.shardId();
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Processing {} record(s)", processRecordsInput.records().size());
            processRecordsInput.records().forEach(r -> 
                log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber())
            );
            
            // Checkpoint periodically
            processRecordsInput.checkpointer().checkpoint();
        } catch (Throwable t) {
            log.error("Caught throwable while processing records. Aborting.", t);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Lost lease, so terminating.");
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Reached shard end checkpointing.");
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Exception while checkpointing at shard end. Giving up.", e);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        MDC.put(SHARD_ID_MDC_KEY, shardId);
        try {
            log.info("Scheduler is shutting down, checkpointing.");
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
        } finally {
            MDC.remove(SHARD_ID_MDC_KEY);
        }
    }
}
```

Veja a seguir uma explicação detalhada de cada método usado no exemplo:

**inicializar (InitializationInputInitializationInput)**
+ Objetivo: configurar todos os recursos ou estados necessários para processar registros.
+ Quando é chamado: uma vez, quando a KCL atribui um fragmento a esse processador de registros.
+ Principais pontos:
  + `initializationInput.shardId()`: a ID do fragmento que esse processador manipulará.
  + `initializationInput.extendedSequenceNumber()`: o número sequencial a partir do qual o processamento será iniciado.

**processRecords () ProcessRecordsInput processRecordsInput**
+ Objetivo: processar os registros recebidos e, opcionalmente, verificar o progresso.
+ Quando é chamado: repetidamente, desde que o processador de registros mantenha a concessão do fragmento.
+ Principais pontos:
  + `processRecordsInput.records()`: lista de registros a serem processados.
  + `processRecordsInput.checkpointer()`: usado para verificar o progresso.
  + Verifique se todas as exceções foram tratadas durante o processamento para evitar que a KCL falhe.
  + Esse método deve ser idempotente, pois o mesmo registro pode ser processado mais de uma vez em alguns cenários, como dados que não foram verificados antes de falhas ou reinicializações inesperadas do operador.
  + Sempre limpe todos os dados armazenados em buffer antes de verificar para garantir a consistência de dados.

**Locação perdida () LeaseLostInput leaseLostInput**
+ Objetivo: limpar todos os recursos específicos para o processamento desse fragmento.
+ Quando é chamado: quando outro Agendador assume a concessão desse fragmento.
+ Principais pontos:
  + A verificação não é permitida neste método.

**Encerrado () ShardEndedInput shardEndedInput**
+ Objetivo: concluir o processamento desse fragmento e verificar.
+ Quando é chamado: quando o fragmento é dividido ou mesclado, indicando que todos os dados desse fragmento foram processados.
+ Principais pontos:
  + `shardEndedInput.checkpointer()`: usado para realizar a verificação final.
  + A verificação nesse método é obrigatória para concluir o processamento.
  + Deixar de liberar os dados e fazer a verificação aqui pode resultar na perda de dados ou no processamento duplicado quando o fragmento for reaberto.

**Desligamento solicitado () ShutdownRequestedInput shutdownRequestedInput**
+ Objetivo: verifique e limpe os recursos quando a KCL estiver desligada.
+ Quando é chamado: quando a KCL está sendo encerrada. Por exemplo, quando a aplicação está sendo encerrada).
+ Principais pontos:
  + `shutdownRequestedInput.checkpointer()`: usado para realizar o ponto de verificação antes do desligamento.
  + Implemente o ponto de verificação no método para que o andamento seja salvo antes que a aplicação pare.
  + A falha na liberação dos dados e na implementação do ponto de verificação aqui pode resultar na perda de dados ou no reprocessamento de registros quando a aplicação for reiniciada.

**Importante**  
A KCL 3.x garante menos reprocessamento de dados quando a concessão é passada de um operador para outro por meio de um ponto de verificação antes que o operador anterior seja desligado. Se não implementar a lógica do ponto de verificação no método `shutdownRequested()`, você não obterá esse benefício. Implemente uma lógica de ponto de verificação dentro do método `shutdownRequested()`.

### RecordProcessorFactory
<a name="implementation-recordprocessorfactory"></a>

RecordProcessorFactory é responsável pela criação de novas RecordProcessor instâncias. A KCL usa essa fábrica para criar um novo RecordProcessor para cada fragmento que o aplicativo precisa processar.

Principais responsabilidades:
+ Crie novas RecordProcessor instâncias sob demanda
+ Certifique-se de que cada um RecordProcessor esteja inicializado corretamente

A seguir, um exemplo de implementação:

```
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new SampleRecordProcessor();
    }
}
```

Neste exemplo, a fábrica cria um novo SampleRecordProcessor cada vez que shardRecordProcessor () é chamado. Isso pode ser estendido para incluir qualquer lógica de inicialização necessária.

### Agendador
<a name="implementation-scheduler"></a>

O Agendador é um componente de alto nível que coordena todas as atividades da aplicação KCL. Ele é responsável pela orquestração geral do processamento de dados.

Principais responsabilidades:
+ Gerencie o ciclo de vida do RecordProcessors
+ Gerenciar o gerenciamento de concessão para fragmentos
+ Coordenar os pontos de verificação
+ Equilibrar a carga de processamento de fragmentos entre vários operadores da sua aplicação
+ Gerenciar os sinais de desligamento normal e encerramento da aplicação

Normalmente, o Agendador é criado e iniciado na aplicação principal. Você pode verificar o exemplo de implementação do Agendador na seção a seguir: Aplicação de consumo principal. 

### Aplicação de consumo principal
<a name="implementation-main"></a>

A aplicação de consumo principal une todos os componentes. Ela é responsável por configurar o consumidor da KCL, criar os clientes necessários, configurar o Agendador e gerenciar o ciclo de vida da aplicação.

Principais responsabilidades:
+ Configurar clientes AWS de serviço (Kinesis, DynamoDB,) CloudWatch
+ Configurar a aplicação KCL
+ Criar e iniciar o Agendador
+ Controlar o desligamento da aplicação

A seguir, um exemplo de implementação:

```
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import java.util.UUID;

public class SampleConsumer {
    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    public SampleConsumer(String streamName, Region region) {
        this.streamName = streamName;
        this.region = region;
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    public void run() {
        DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        
        ConfigsBuilder configsBuilder = new ConfigsBuilder(
            streamName, 
            streamName, 
            kinesisClient, 
            dynamoDbAsyncClient,
            cloudWatchClient, 
            UUID.randomUUID().toString(), 
            new SampleRecordProcessorFactory()
        );

        Scheduler scheduler = new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();
    }

    public static void main(String[] args) {
        String streamName = "your-stream-name"; // replace with your stream name
        Region region = Region.US_EAST_1; // replace with your region
        new SampleConsumer(streamName, region).run();
    }
}
```

 A KCL cria um consumidor de distribuição avançada (Enhanced Fan-out, EFO) com throughput dedicada por padrão. Para obter mais informações sobre distribuição avançada, consulte [Desenvolver consumidores de distribuição avançada com throughput dedicado](enhanced-consumers.md). Se você tiver menos de 2 consumidores ou não precisar de atrasos de propagação de leitura abaixo de 200 ms, defina a seguinte configuração no objeto do agendador para usar consumidores de throughput compartilhada:

```
configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
```

O código a seguir é um exemplo de criação de um objeto do agendador que usa consumidores de throughput compartilhada:

**Importações**:

```
import software.amazon.kinesis.retrieval.polling.PollingConfig;
```

**Código**:

```
Scheduler scheduler = new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );/
```

# Desenvolva consumidores com a KCL em linguagens não Java
<a name="develop-kcl-consumers-non-java"></a>

Esta seção aborda a implementação de consumidores que usam a Kinesis Client Library (KCL) em Python, Node.js, .NET e Ruby.

A KCL é uma biblioteca Java. O suporte para linguagens diferentes de Java é fornecido usando uma interface multilíngue chamada `MultiLangDaemon`. Esse daemon baseado em Java é executado em segundo plano quando uma KCL com uma linguagem diferente de Java é utilizada. Portanto, se você instalar a KCL para linguagens não Java e criar a aplicação de consumo inteiramente em linguagens não Java, ainda assim você precisará ter Java instalado no sistema por causa do `MultiLangDaemon`. Além disso, o `MultiLangDaemon` tem algumas configurações padrão que você pode precisar para personalizar de acordo com seu caso de uso (por exemplo, a região da AWS à qual ele se conecta). Para obter mais informações sobre o `MultiLangDaemon` on GitHub, consulte o [ MultiLangDaemon projeto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Embora os conceitos principais permaneçam os mesmos em todas as linguagens, existem algumas considerações e implementações específicas de uma linguagem. Para obter os principais conceitos sobre o desenvolvimento de consumo da KCL, consulte [Desenvolver consumidores com a KCL em Java](develop-kcl-consumers-java.md). Para obter informações mais detalhadas sobre como desenvolver consumidores de KCL em Python, Node.js, .NET e Ruby e as atualizações mais recentes, consulte os seguintes repositórios: GitHub 
+ Python: [amazon-kinesis-client-python](https://github.com/awslabs/amazon-kinesis-client-python)
+ Node.js: [amazon-kinesis-client-nodejs](https://github.com/awslabs/amazon-kinesis-client-nodejs)
+ .NET: [amazon-kinesis-client-net](https://github.com/awslabs/amazon-kinesis-client-net)
+ Ruby: [amazon-kinesis-client-ruby](https://github.com/awslabs/amazon-kinesis-client-ruby)

**Importante**  
Não use as versões não Java da KCL a seguir se você estiver usando o JDK 8. Essas versões contêm uma dependência (logback) que é incompatível com o JDK 8.  
KCL Python 3.0.2 e 2.2.0
KCL Node.js 2.3.0
KCL .NET 3.1.0
KCL Ruby 2.2.0
Recomendamos que você use as versões lançadas antes ou depois dessas versões afetadas ao trabalhar com o JDK 8.

# Processamento de vários fluxos com a KCL
<a name="kcl-multi-stream"></a>

Esta seção descreve as alterações necessárias na KCL que permitem criar aplicações de consumo da KCL que podem processar mais de um fluxo de dados ao mesmo tempo.
**Importante**  
O processamento de vários fluxos é compatível somente na KCL 2.3 ou versão posterior.
O processamento de vários fluxos *não* será compatível se houver consumidores da KCL criados em linguagens não Java e executados com `multilangdaemon`.
O processamento de vários fluxos *não* tem suporte em nenhuma versão da KCL 1.x.
+ **MultistreamTracker interface**
  + Para criar um aplicativo de consumidor que possa processar vários fluxos ao mesmo tempo, você deve implementar uma nova interface chamada [MultistreamTracker](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java). Essa interface inclui o método `streamConfigList`, que retorna a lista de fluxos de dados, e suas configurações, a serem processados pela aplicação de consumo da KCL. Observe que os fluxos de dados processados podem ser alterados durante o runtime da aplicação de consumo. `streamConfigList` é chamado periodicamente pela KCL para obter informações das mudanças nos fluxos de dados a serem processados.
  + O `streamConfigList` preenche a [StreamConfig](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java#L23)lista.

  ```
  package software.amazon.kinesis.common;
  
  import lombok.Data;
  import lombok.experimental.Accessors;
  
  @Data
  @Accessors(fluent = true)
  public class StreamConfig {
      private final StreamIdentifier streamIdentifier;
      private final InitialPositionInStreamExtended initialPositionInStreamExtended;
      private String consumerArn;
  }
  ```
  + Os campos `StreamIdentifier` e `InitialPositionInStreamExtended` são obrigatórios, enquanto `consumerArn` é opcional. Só é necessário fornecer `consumerArn` se você estiver usando a KCL para implementar uma aplicação de consumo de distribuição avançada.
  + Para obter mais informações sobre`StreamIdentifier`, consulte [https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java \$1L129](https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L129). Para criar um `StreamIdentifier`, recomenda-se a criação de uma instância de vários fluxos a partir do `streamArn` e do `streamCreationEpoch` que esteja disponível na KCL 2.5.0 ou versões posteriores. Na KCL v2.3 e v2.4, que não oferecem suporte ao `streamArm`, crie uma instância multifluxo usando o formato `account-id:StreamName:streamCreationTimestamp`. Esse formato será descontinuado e não terá mais suporte a partir da próxima versão principal.
  +  MultistreamTracker também inclui uma estratégia para excluir locações de fluxos antigos na tabela de locação (). formerStreamsLeases DeletionStrategy Observe que a estratégia NÃO PODE ser alterada durante o runtime da aplicação de consumo. Para obter mais informações, consulte [https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0 b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy .java](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java).
+   [ConfigsBuilder](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java)é uma classe de todo o aplicativo que você pode usar para especificar todas as configurações da KCL a serem usadas ao criar seu aplicativo consumidor da KCL para a versão 2.x ou posterior da KCL. `ConfigsBuilder`a classe agora tem suporte para a `MultistreamTracker` interface. Você pode inicializar ConfigsBuilder com o nome do único fluxo de dados do qual consumir registros: 

  ```
  /**
       * Constructor to initialize ConfigsBuilder with StreamName
       * @param streamName
       * @param applicationName
       * @param kinesisClient
       * @param dynamoDBClient
       * @param cloudWatchClient
       * @param workerIdentifier
       * @param shardRecordProcessorFactory
       */
      public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName,
              @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
              @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
              @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
          this.appStreamTracker = Either.right(streamName);
          this.applicationName = applicationName;
          this.kinesisClient = kinesisClient;
          this.dynamoDBClient = dynamoDBClient;
          this.cloudWatchClient = cloudWatchClient;
          this.workerIdentifier = workerIdentifier;
          this.shardRecordProcessorFactory = shardRecordProcessorFactory;
      }
  ```  

Ou você pode inicializar ConfigsBuilder com `MultiStreamTracker` se quiser implementar um aplicativo consumidor KCL que processe vários fluxos ao mesmo tempo.

```
* Constructor to initialize ConfigsBuilder with MultiStreamTracker
     * @param multiStreamTracker
     * @param applicationName
     * @param kinesisClient
     * @param dynamoDBClient
     * @param cloudWatchClient
     * @param workerIdentifier
     * @param shardRecordProcessorFactory
     */
    public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName,
            @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
            @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
            @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
        this.appStreamTracker = Either.left(multiStreamTracker);
        this.applicationName = applicationName;
        this.kinesisClient = kinesisClient;
        this.dynamoDBClient = dynamoDBClient;
        this.cloudWatchClient = cloudWatchClient;
        this.workerIdentifier = workerIdentifier;
        this.shardRecordProcessorFactory = shardRecordProcessorFactory;
    }
```
+ Com o suporte para vários fluxos implementado para sua aplicação de consumo da KCL, cada linha da tabela de concessões da aplicação contém agora a ID do fragmento e o nome do fluxo dos vários fluxos de dados que esta aplicação processa.
+ Quando o suporte para vários fluxos para sua aplicação de consumo da KCL é implementado, leaseKey assume a estrutura `account-id:StreamName:streamCreationTimestamp:ShardId`. Por exemplo, .`111111111:multiStreamTest-1:12345:shardId-000000000336`

**Importante**  
Quando sua aplicação de consumo da KCL atual está configurada para processar somente um fluxo de dados, a `leaseKey` (que é a chave de partição da tabela de concessões) é a ID do fragmento. Se você reconfigurar uma aplicação de consumo da KCL atual para processar vários fluxos de dados, a tabela de concessões será quebrada, pois a estrutura da `leaseKey` deve ser `account-id:StreamName:StreamCreationTimestamp:ShardId` para oferecer suporte a vários fluxos.

# Use o registro do AWS Glue esquema com o KCL
<a name="kcl-glue-schema"></a>

Você pode integrar o Kinesis Data Streams AWS Glue ao registro do esquema. O registro do AWS Glue esquema permite que você descubra, controle e desenvolva esquemas de forma centralizada, ao mesmo tempo em que garante que os dados produzidos sejam validados continuamente por um esquema registrado. O esquema define a estrutura e o formato de um registro de dados. Um esquema é uma especificação versionada para publicação, consumo ou datastore confiáveis. O registro do AWS Glue esquema permite que você melhore a qualidade end-to-end dos dados e a governança de dados em seus aplicativos de streaming. Para obter mais informações, consulte [Registro de esquemas do AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). Uma das formas de configurar essa integração é usando a KCL para Java.

**Importante**  
AWS Glue A integração do registro de esquemas para o Kinesis Data Streams só é suportada no KCL 2.3 ou posterior.
AWS Glue A integração do registro de esquemas para o Kinesis Data Streams não *é* compatível com consumidores de KCL escritos em linguagens não Java que são executadas com. `multilangdaemon`
AWS Glue A integração do registro de esquemas para o Kinesis Data Streams não *é* suportada em nenhuma versão do KCL 1.x.

Para obter instruções detalhadas sobre como configurar a integração do Kinesis Data Streams com o registro do AWS Glue esquema usando o KCL, consulte a seção “Interagindo com dados usando as KPL/KCL bibliotecas” em Caso de [uso: integração do Amazon Kinesis Data Streams](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds) com o Schema Registry. AWS Glue 

# Permissões do IAM necessárias para aplicativos de consumo da KCL
<a name="kcl-iam-permissions"></a>

 É necessário adicionar as permissões a seguir para o perfil do IAM ou o usuário associado à aplicação de consumo da KCL. 

 Práticas recomendadas de segurança para AWS ditar o uso de permissões refinadas para controlar o acesso a diferentes recursos. AWS Identity and Access Management (IAM) permite gerenciar usuários e permissões de usuários no AWS. Uma Política do IAM lista explicitamente as ações permitidas e os recursos aos quais as ações são aplicáveis.

A tabela a seguir mostra as permissões mínimas do IAM geralmente necessárias para aplicações de consumo da KCL:


**Permissões do IAM mínimas para aplicações de consumo da KCL**  

| Serviço | Ações | Recursos (ARNs) | Finalidade | 
| --- | --- | --- | --- | 
| Amazon Kinesis Data Streams |  `DescribeStream` `DescribeStreamSummary` `RegisterStreamConsumer`  |  O fluxo de dados do Kinesis a partir do qual sua aplicação da KCL processará os dados.`arn:aws:kinesis:region:account:stream/StreamName`  |  Antes de tentar ler registros, o consumidor verifica se o fluxo de dados existe, se está ativo e se os fragmentos estão contidos no fluxo de dados. Registra os consumidores em um fragmento.  | 
| Amazon Kinesis Data Streams |  `GetRecords` `GetShardIterator` `ListShards`  | Fluxo de dados do Kinesis a partir do qual sua aplicação da KCL processará os dados.`arn:aws:kinesis:region:account:stream/StreamName` |  Lê registros de um fragmento.  | 
| Amazon Kinesis Data Streams |  `SubscribeToShard` `DescribeStreamConsumer` |  Fluxo de dados do Kinesis a partir do qual sua aplicação da KCL processará os dados. Adicione essa ação somente se você usar consumidores de distribuição avançada (EFO). `arn:aws:kinesis:region:account:stream/StreamName/consumer/*`  |  Assina um fragmento para consumidores de distribuição avançada (EFO).  | 
| Amazon DynamoDB |  `CreateTable` `DescribeTable` `UpdateTable` `Scan` `GetItem` `PutItem` `UpdateItem` `DeleteItem`  |  Tabela de concessão (tabela de metadados no DynamoDB criada pela KCL). `arn:aws:dynamodb:region:account:table/KCLApplicationName`  |  Essas ações são necessárias para que a KCL gerencie a tabela de concessões criada no DynamoDB.  | 
| Amazon DynamoDB |  `CreateTable` `DescribeTable` `Scan` `GetItem` `PutItem` `UpdateItem` `DeleteItem`  |  Métricas do operador e tabela de estados do coordenador (tabelas de metadados no DynamoDB) criada pela KCL. `arn:aws:dynamodb:region:account:table/KCLApplicationName-WorkerMetricStats` `arn:aws:dynamodb:region:account:table/KCLApplicationName-CoordinatorState`  |  Essas ações são necessárias para que a KCL gerencie as métricas do operador e as tabelas de metadados do estado do coordenador no DynamoDB.  | 
| Amazon DynamoDB | `Query` |  Índice secundário global na tabela de concessões. `arn:aws:dynamodb:region:account:table/KCLApplicationName/index/*`  |  Essa ação é necessária para que a KCL leia o índice secundário global da tabela de concessão criada no DynamoDB.  | 
| Amazon CloudWatch | `PutMetricData` |  \$1  |  Faça upload de métricas CloudWatch que sejam úteis para monitorar o aplicativo. O asterisco (\$1) é usado porque não há nenhum recurso específico CloudWatch no qual a `PutMetricData` ação seja invocada.   | 

**nota**  
Substitua “região”, “conta”StreamName, "e" KCLApplication Nome” no por seu próprio Conta da AWS número Região da AWS, nome do ARNs stream de dados Kinesis e nome do aplicativo KCL, respectivamente. A KCL 3.x cria mais duas tabelas de metadados no DynamoDB. Para obter detalhes sobre as tabelas de metadados do DynamoDB criadas pela KCL, consulte [Tabelas de metadados do DynamoDB e balanceamento de carga na KCL](kcl-dynamoDB.md). Se você usar as configurações para personalizar os nomes das tabelas de metadados criadas pela KCL, use esses nomes de tabela especificados em vez do nome da aplicação da KCL. 

Um exemplo de documento de política para um aplicação de consumo da KCL é mostrado a seguir. 

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer",
                "kinesis:GetRecords",
                "kinesis:GetShardIterator",
                "kinesis:ListShards"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/STREAM_NAME"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:SubscribeToShard",
                "kinesis:DescribeStreamConsumer"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/STREAM_NAME/consumer/*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:CreateTable",
                "dynamodb:DescribeTable",
                "dynamodb:UpdateTable",
                "dynamodb:GetItem",
                "dynamodb:UpdateItem",
                "dynamodb:PutItem",
                "dynamodb:DeleteItem",
                "dynamodb:Scan"
            ],
            "Resource": [
            "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:CreateTable",
                "dynamodb:DescribeTable",
                "dynamodb:GetItem",
                "dynamodb:UpdateItem",
                "dynamodb:PutItem",
                "dynamodb:DeleteItem",
                "dynamodb:Scan"
            ],
            "Resource": [
            "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME-WorkerMetricStats",
    "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME-CoordinatorState"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:Query"
            ],
            "Resource": [
            "arn:aws:dynamodb:us-east-1:123456789012:table/KCL_APPLICATION_NAME/index/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": "*"
        }
    ]
}
```

------

Antes de usar esta política de exemplo, verifique os seguintes itens:
+ Substitua REGION pela sua Região da AWS (por exemplo, us-east-1).
+ Substitua ACCOUNT\$1ID pelo seu Conta da AWS ID.
+ Substitua STREAM\$1NAME pelo nome do seu fluxo de dados do Kinesis.
+ Substitua CONSUMER\$1NAME pelo nome do seu consumidor, normalmente o nome da sua aplicação ao usar a KCL.
+ Substitua KCL\$1APPLICATION\$1NAME pelo nome da aplicação da KCL.

# Configurações da KCL
<a name="kcl-configuration"></a>

Você pode definir as propriedades de configuração para personalizar a funcionalidade da Kinesis Client Library para atender às suas necessidades específicas. A tabela a seguir descreve as classes e propriedades de configuração.

**Importante**  
Na KCL 3.x, o algoritmo de balanceamento de carga visa a alcançar uma utilização uniforme da CPU entre os operadores, e não um número igual de concessões por operador. Se a configuração `maxLeasesForWorker` for muito baixa, será possível limitar a capacidade da KCL de equilibrar a workload de forma eficaz. Se você usar a configuração `maxLeasesForWorker`, considere aumentar seu valor para permitir a melhor distribuição de carga possível.


**Esta tabela mostra as propriedades de configuração da KCL**  

| Propriedade de configuração | Classe de configuração | Description | Valor padrão  | 
| --- | --- | --- | --- | 
| applicationName | ConfigsBuilder | O nome da aplicação da KCL. Usado como padrão para o tableName e o consumerName. | Não aplicável | 
| tableName | ConfigsBuilder |  Permite substituir o nome usado para a tabela de concessão do Amazon DynamoDB.  | Não aplicável | 
| streamName | ConfigsBuilder |  O nome do fluxo a partir do qual esse aplicativo processa registros.  | Não aplicável | 
| workerIdentifier | ConfigsBuilder |  Um identificador exclusivo que representa a instanciação do processador do aplicativo. Isso deve ser exclusivo.  | Não aplicável | 
| failoverTimeMillis | LeaseManagementConfig |  O número de milissegundos que devem passar antes que se considere uma falha do proprietário da concessão. No caso de aplicações que têm um grande número de fragmentos, um número maior pode ser definido para reduzir o número de IOPS do DynamoDB necessário para rastrear as concessões.  | 10.000 (10 segundos) | 
| shardSyncIntervalMillis | LeaseManagementConfig |  O tempo entre as chamadas de sincronização de fragmentos.  | 60.000 (60 segundos) | 
| cleanupLeasesUponShardCompletion | LeaseManagementConfig |  Quando definidas, as concessões são removidas assim que as concessões filho iniciam o processamento.  | TRUE | 
| ignoreUnexpectedChildShards | LeaseManagementConfig |  Quando definidos, fragmentos filho que possuem um fragmento aberto são ignorados. Essa configuração destina-se principalmente a fluxos do DynamoDB.  | FALSE | 
| maxLeasesForWorker | LeaseManagementConfig |  O número máximo de concessões que um único operador deve aceitar. Se esse número for definido muito baixo, isso pode causar perda de dados se os operadores não conseguirem processar todos os fragmentos, levando a uma atribuição de concessões abaixo do ideal entre os operadores. Considere a contagem total de fragmentos, o número de operadores e a capacidade de processamento do operador ao configurá-lo.  | Ilimitado | 
| maxLeaseRenewalThreads | LeaseManagementConfig |  Controla o tamanho do grupo de threads de renovação de concessão. Quanto mais concessões seu aplicativo aceitar, maior esse grupo deve ser.  | 20 | 
| billingMode | LeaseManagementConfig |  Determina o modo de capacidade da tabela de concessões criada no DynamoDB. Há duas opções: modo sob demanda (PAY\$1PER\$1REQUEST) e modo provisionado. Recomendamos usar a configuração padrão do modo sob demanda, pois ela é escalada automaticamente para acomodar sua workload sem a necessidade de planejamento de capacidade.  | PAY\$1PER\$1REQUEST (modo sob demanda) | 
| initialLeaseTableReadCapacity | LeaseManagementConfig | A capacidade de leitura do DynamoDB que será usada se a Kinesis Client Library precisar criar uma nova tabela de concessões do DynamoDB com o modo de capacidade provisionada. Você pode ignorar essa configuração se estiver usando o modo de capacidade sob demanda padrão na configuração billingMode. | 10 | 
| initialLeaseTableWriteCapacity | LeaseManagementConfig | A capacidade de leitura do DynamoDB que será usada se a Kinesis Client Library precisar criar uma nova tabela de concessões do DynamoDB. Você pode ignorar essa configuração se estiver usando o modo de capacidade sob demanda padrão na configuração billingMode. | 10 | 
| initialPositionInStreamExtended | LeaseManagementConfig |  A posição inicial do aplicativo no fluxo. Isso é usado somente durante a criação da concessão inicial.  |  InitialPositionInStream.TRIM\$1HORIZON  | 
| reBalanceThresholdPercentage | LeaseManagementConfig |  Um valor percentual que define quando o algoritmo de balanceamento de carga deve considerar a reatribuição de fragmentos entre os operadores. Essa é uma nova configuração introduzida na KCL 3.x.  | 10 | 
| dampeningPercentage | LeaseManagementConfig |  Um valor percentual usado para amortecer a quantidade de carga que será movida do operador sobrecarregado em uma única operação de rebalanceamento. Essa é uma nova configuração introduzida na KCL 3.x.  | 60 | 
| allowThroughputOvershoot | LeaseManagementConfig |  Determina se a concessão adicional ainda precisa ser obtida do operador sobrecarregado, mesmo que isso faça com que a quantidade total de throughput da concessão exceda a quantidade de throughput desejada. Essa é uma nova configuração introduzida na KCL 3.x.  | TRUE | 
| disableWorkerMetrics | LeaseManagementConfig |  Determina se a KCL deve ignorar as métricas de recursos dos operadores (como a utilização da CPU) ao reatribuir concessões e balancear a carga. Defina isso como TRUE se quiser evitar que a KCL balanceie a carga com base na utilização da CPU. Essa é uma nova configuração introduzida na KCL 3.x.  | FALSE | 
| maxThroughputPerHostKBps | LeaseManagementConfig |  Quantidade de throughput máxima a ser atribuída a um operador durante a atribuição da concessão. Essa é uma nova configuração introduzida na KCL 3.x.  | Ilimitado | 
| isGracefulLeaseHandoffEnabled | LeaseManagementConfig |  Controla o comportamento da transferência de concessões entre os operadores. Quando definido como verdadeiro, a KCL tentará transferir os arrendamentos normalmente, permitindo que o fragmento RecordProcessor tenha tempo suficiente para concluir o processamento antes de entregar o contrato a outro trabalhador. Isso pode ajudar a garantir integridade dos dados e transições suaves, mas pode aumentar o tempo de transferência. Quando definido como falso, o contrato será entregue imediatamente, sem esperar que o RecordProcessor contrato seja encerrado normalmente. Isso pode levar a transferências mais rápidas, mas pode causar um processamento incompleto. Nota: O ponto de verificação deve ser implementado dentro do método shutdownRequested () do RecordProcessor para se beneficiar do recurso elegante de transferência de locação. Essa é uma nova configuração introduzida na KCL 3.x.  | TRUE | 
| gracefulLeaseHandoffTimeoutMillis | LeaseManagementConfig |  Especifica o tempo mínimo (em milissegundos) para esperar que os fragmentos atuais sejam encerrados normalmente antes de transferir o contrato RecordProcessor à força para o próximo proprietário. Se seu método processRecords é executado comumente por mais tempo do que o valor padrão, considere aumentar essa configuração. Isso garante que RecordProcessor tenha tempo suficiente para concluir seu processamento antes que a transferência da locação ocorra. Essa é uma nova configuração introduzida na KCL 3.x.  | 30.000 (30 segundos) | 
| maxRecords | PollingConfig |  Permite definir o número máximo de registros que o Kinesis retorna.  | 10.000 | 
| retryGetRecordsInSeconds | PollingConfig |  Configura o atraso entre as GetRecords tentativas de falhas.  | Nenhum | 
| maxGetRecordsThreadPool | PollingConfig |  O tamanho do pool de fios usado para GetRecords.  | Nenhum | 
| idleTimeBetweenReadsInMillis | PollingConfig |  Determina quanto tempo a KCL espera entre as GetRecords chamadas para pesquisar os dados dos fluxos de dados. A unidade é milissegundos.  | 1.500 | 
| callProcessRecordsEvenForEmptyRecordList | ProcessorConfig |  Quando definido, o processador de registros é chamado mesmo quando o Kinesis não fornece nenhum registro.  | FALSE | 
| parentShardPollIntervalMillis | CoordinatorConfig |  Com que frequência um processador de registros deve sondar a conclusão de fragmentos pai. A unidade é milissegundos.  | 10.000 (10 segundos) | 
| skipShardSyncAtWorkerInitializationIfLeaseExist | CoordinatorConfig |  Desative a sincronização de dados de fragmento se a tabela de concessão contiver concessões existentes.  |  FALSE  | 
| shardPrioritization | CoordinatorConfig |  A priorização de fragmentos a ser usada.  |  NoOpShardPrioritization  | 
| ClientVersionConfig | CoordinatorConfig |  Determina em qual modo de compatibilidade de versão da KCL a aplicação será executada. Essa configuração serve somente para a migração de versões anteriores da KCL. Ao migrar para 3.x, é preciso definir essa configuração como `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X`. Essa configuração pode ser removida quando a migração for concluída.  | CLIENT\$1VERSION\$1CONFIG\$13X | 
| taskBackoffTimeMillis | LifecycleConfig |  Tempo de espera para repetir tarefas da KCL com falha. A unidade é milissegundos.  | 500 (0,5 segundo) | 
| logWarningForTaskAfterMillis | LifecycleConfig |  Quanto tempo esperar antes de um aviso ser registrado caso uma tarefa não seja concluída.  | Nenhum | 
| listShardsBackoffTimeInMillis | RetrievalConfig | O número de milissegundos de espera entre as chamadas para ListShards em caso de falha. A unidade é milissegundos. | 1.500 (1,5 segundo) | 
| maxListShardsRetryAttempts | RetrievalConfig | O número máximo de novas tentativas de ListShards antes de desistir. | 50 | 
| metricsBufferTimeMillis | MetricsConfig |  Especifica a duração máxima (em milissegundos) para armazenar métricas em buffer antes de publicá-las em. CloudWatch  | 10.000 (10 segundos) | 
| metricsMaxQueueSize | MetricsConfig |  Especifica o número máximo de métricas a serem armazenadas em buffer antes da publicação. CloudWatch  | 10.000 | 
| metricsLevel | MetricsConfig |  Especifica o nível de granularidade das CloudWatch métricas a serem ativadas e publicadas.  Valores possíveis: NENHUM, RESUMO, DETALHADO.  |  MetricsLevel.DETALHADO  | 
| metricsEnabledDimensions | MetricsConfig |  Controla as dimensões permitidas para CloudWatch métricas.  | Todas as dimensões | 

**Configurações descontinuadas na KCL 3.x**

As seguintes propriedades de configuração foram descontinuadas na KCL 3.x:


**A tabela mostra as propriedades de configuração descontinuadas da KCL 3.x**  

| Propriedade de configuração | Classe de configuração | Description | 
| --- | --- | --- | 
| maxLeasesToStealAtOneTime | LeaseManagementConfig |  O número máximo de concessões que um aplicativo deve tentar roubar de uma só vez. A KCL 3.x ignorará essa configuração e reatribuirá as concessões com base na utilização de recursos dos operadores.  | 
| enablePriorityLeaseAssignment | LeaseManagementConfig |  Controla se os operadores devem priorizar concessões muito expiradas (concessões não renovadas por 3 vezes o tempo de failover) e novas concessões de fragmentos, independentemente do número de concessões pretendidas, mas ainda respeitando os limites máximos de concessões. A KCL 3.x ignorará essa configuração e sempre distribuirá as concessões expiradas entre os operadores.  | 

**Importante**  
Você precisa ter as propriedades de configuração descontinuadas durante a migração das versões anteriores da KCL para a KCL 3.x. Durante a migração, o operador da KCL iniciará primeiro com o modo compatível com KCL 2.x e passará para o modo de funcionalidade da KCL 3.x quando detectar que todos os operadores da KCL da aplicação estão prontos para executar a KCL 3.x. Essas configurações descontinuadas são necessárias enquanto os operadores da KCL estiverem executando o modo compatível com a KCL 2.x.

# Política de ciclo de vida da versão da KCL
<a name="kcl-version-lifecycle-policy"></a>

Este tópico descreve a política de ciclo de vida da versão para a Amazon Kinesis Client Library (KCL). AWS fornece regularmente novos lançamentos para as versões KCL para oferecer suporte a novos recursos e aprimoramentos, correções de erros, patches de segurança e atualizações de dependências. Recomendamos que você continue up-to-date com as versões do KCL para acompanhar os recursos, as atualizações de segurança e as dependências subjacentes mais recentes. **Não** recomendamos o uso contínuo de uma versão não compatível da KCL.

O ciclo de vida das principais versões da KCL consiste nas três fases a seguir:
+ **Disponibilidade geral (GA)** — Durante essa fase, a versão principal é totalmente suportada. AWS fornece lançamentos regulares de versões secundárias e de patches que incluem suporte para novos recursos ou atualizações de API para o Kinesis Data Streams, bem como correções de bugs e segurança.
+ **Modo de manutenção** — AWS limita os lançamentos da versão do patch para tratar apenas de correções críticas de bugs e problemas de segurança. A versão principal não receberá atualizações dos novos recursos ou APIs do Kinesis Data Streams.
+ **E nd-of-support** — A versão principal não receberá mais atualizações ou lançamentos. As versões publicadas anteriormente continuarão disponíveis por meio de gerenciadores de pacotes públicos e o código permanecerá ativado GitHub. O uso de uma versão que chegou end-of-support é feito a critério do usuário. Recomendamos que você faça a atualização para a versão principal mais recente.


| Versão principal | Fase atual | Data de lançamento | Data do modo de manutenção | End-of-support encontro | 
| --- | --- | --- | --- | --- | 
| KCL 1.x | Modo de manutenção | 19/12/2013 | 2025-04-17 | 2026-01-30 | 
| KCL 2.x | Disponibilidade geral | 2018-08-02 | -- | -- | 
| KCL 3.x | Disponibilidade geral | 2024-11-06 | -- | -- | 

# Migrar de versões anteriores da KCL
<a name="kcl-migration-previous-versions"></a>

Este tópico explica como migrar das versões anteriores da Kinesis Client Library (KCL). 

## Quais são as novidades da KCL 3.0?
<a name="kcl-migration-new-3-0"></a>

A Kinesis Client Library (KCL) 3.0 conta com vários aprimoramentos importantes em comparação com as versões anteriores:
+  ela reduz os custos de computação para aplicações de consumo ao redistribuir automaticamente o trabalho de operadores com utilização acima da capacidade para os operadores subutilizados na frota de aplicações de consumo. Esse novo algoritmo de balanceamento de carga garante que a utilização da CPU seja distribuída de modo uniforme entre os operadores e elimina a necessidade do provisionamento excessivo de operadores.
+  Ele reduz o custo do DynamoDB associado à KCL ao otimizar as operações de leitura na tabela de concessões.
+ Ele minimiza o reprocessamento de dados quando as concessões são transferidas para outro operador ao permitir que o operador atual conclua a verificação dos registros processados.
+  Ele é usado AWS SDK for Java 2.x para melhorar o desempenho e os recursos de segurança, removendo totalmente a dependência do AWS SDK para Java 1.x.

Para mais informações, consulte a [nota de lançamento da KCL 3.0](https://github.com/awslabs/amazon-kinesis-client/blob/master/CHANGELOG.md).

**Topics**
+ [Quais são as novidades da KCL 3.0?](#kcl-migration-new-3-0)
+ [Migrar da KCL 2.x para a KCL 3.x](kcl-migration-from-2-3.md)
+ [Reverter para a versão anterior da KCL](kcl-migration-rollback.md)
+ [Avançar para a KCL 3.x após uma reversão](kcl-migration-rollforward.md)
+ [Práticas recomendadas para a tabela de concessões com modo de capacidade provisionada](kcl-migration-lease-table.md)
+ [Migrar da KCL 1.x para a KCL 3.x](kcl-migration-1-3.md)

# Migrar da KCL 2.x para a KCL 3.x
<a name="kcl-migration-from-2-3"></a>

Este tópico fornece step-by-step instruções para migrar seu consumidor do KCL 2.x para o KCL 3.x. A KCL 3.x oferece suporte à migração local de consumidores da KCL 2.x. Você pode continuar consumindo os dados do seu fluxo de dados do Kinesis enquanto migra seus operadores de forma contínua.

**Importante**  
A KCL 3.x mantém as mesmas interfaces e métodos da KCL 2.x. Portanto, você não precisa atualizar seu código de processamento de registros durante a migração. No entanto, é necessário definir a configuração adequada e verificar as etapas necessárias para a migração. É altamente recomendável que você siga as etapas de migração a seguir para uma experiência de migração perfeita.

## Etapa 1: pré-requisitos
<a name="kcl-migration-from-2-3-prerequisites"></a>

Antes de começar a usar a KCL 3.x, certifique-se de ter os pré-requisitos apresentados a seguir:
+ Java Development Kit (JDK) 8 ou posterior
+ AWS SDK para Java 2. x
+ Maven ou Gradle para gerenciamento de dependências

**Importante**  
Não use as AWS SDK para Java versões 2.27.19 a 2.27.23 com KCL 3.x. Essas versões têm um problema que causa um erro de exceção relacionado ao uso do DynamoDB da KCL. Recomendamos que você use a AWS SDK para Java versão 2.28.0 ou posterior para evitar esse problema. 

## Etapa 2: adicionar dependências
<a name="kcl-migration-from-2-3-dependencies"></a>

Se estiver usando Maven, adicione a dependência a seguir ao seu arquivo `pom.xml`. Certifique-se de ter substituído 3.x.x pela versão mais recente da KCL. 

```
<dependency>
    <groupId>software.amazon.kinesis</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>3.x.x</version> <!-- Use the latest version -->
</dependency>
```

Se você estiver usando Gradle, adicione o seguinte ao seu arquivo `build.gradle`. Certifique-se de ter substituído 3.x.x pela versão mais recente da KCL. 

```
implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'
```

A versão mais recente da KCL pode ser obtida no [Repositório central do Maven](https://search.maven.org/artifact/software.amazon.kinesis/amazon-kinesis-client).

## Etapa 3: definir a configuração relacionada à migração
<a name="kcl-migration-from-2-3-configuration"></a>

Para migrar da KCL 2.x para a KCL 3.x, é necessário definir o seguinte parâmetro de configuração:
+ CoordinatorConfig. clientVersionConfig: essa configuração determina em qual modo de compatibilidade de versão do KCL o aplicativo será executado. Ao migrar da KCL 2.x para a 3.x, você precisa definir essa configuração como `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X`. Para definir a configuração, adicione a seguinte linha ao criar seu objeto do agendador:

```
configsBuilder.coordiantorConfig().clientVersionConfig(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPLATIBLE_WITH_2X)
```

Veja a seguir um exemplo de como configurar `CoordinatorConfig.clientVersionConfig` para migrar da KCL 2.x para a 3.x. Você pode ajustar outras configurações conforme necessário, com base em seus requisitos específicos:

```
Scheduler scheduler = new Scheduler(
    configsBuilder.checkpointConfig(),
    configsBuilder.coordiantorConfig().clientVersionConfig(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPLATIBLE_WITH_2X),
    configsBuilder.leaseManagementConfig(),
    configsBuilder.lifecycleConfig(),
    configsBuilder.metricsConfig(),
    configsBuilder.processorConfig(),
    configsBuilder.retrievalConfig()
);
```

É importante que todos os operadores em sua aplicação de consumo usem o mesmo algoritmo de balanceamento de carga em um determinado momento, uma vez que a KCL 2.x e a 3.x usam algoritmos diferentes para balancear a carga. A execução de operadores com diferentes algoritmos de balanceamento de carga pode causar uma distribuição de carga abaixo do ideal, pois os dois algoritmos operam de forma independente.

Essa configuração de compatibilidade da KCL 2.x permite que sua aplicação da KCL 3.x seja executada em um modo compatível com a KCL 2.x e use o algoritmo de balanceamento de carga para a KCL 2.x até que todos os operadores da sua aplicação de consumo tenham sido atualizados para a KCL 3.x. Quando a migração for concluída, a KCL mudará automaticamente para o modo de funcionalidade completa da KCL 3.x e começará a usar o novo algoritmo de balanceamento de carga da KCL 3.x para todos os operadores em execução.

**Importante**  
Se você não estiver usando `ConfigsBuilder`, mas criar um objeto `LeaseManagementConfig` para definir as configurações, será preciso adicionar mais um parâmetro chamado `applicationName` na versão 3.x ou posterior da KCL. Para obter detalhes, consulte [Erro de compilação com o LeaseManagementConfig construtor](https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#compiliation-error-leasemanagementconfig). Recomendamos usar `ConfigsBuilder` para definir as configurações da KCL. `ConfigsBuilder` fornece uma maneira mais flexível e sustentável de configurar sua aplicação da KCL.

## Etapa 4: siga as melhores práticas para a implementação do método shutdownRequested()
<a name="kcl-migration-from-2-3-best-practice"></a>

A KCL 3.x introduz um atributo chamado *transferência normal de concessões* para minimizar o reprocessamento de dados quando uma concessão é transferida para outro operador como parte do processo de reatribuição da concessão. Isso é obtido mediante verificação do último número de sequência processado na tabela de concessões antes da transferência da concessão. Para garantir que a transferência normal da concessão funcione corretamente, o objeto `checkpointer` tem de ser invocado dentro do método `shutdownRequested` em sua classe `RecordProcessor`. Se você invocar o objeto `checkpointer` dentro do método `shutdownRequested`, poderá implementá-lo conforme ilustrado no exemplo a seguir. 

**Importante**  
O exemplo de implementação a seguir é um requisito mínimo para uma transferência de concessão normal. Você pode estendê-la para incluir lógica adicional relacionada à verificação, se necessário. Se você estiver executando processamento assíncrono, assegure que todos os registros entregues ao downstream tenham sido processados antes de invocar a verificação. 
Embora a transferência normal da concessão reduza significativamente a probabilidade de reprocessamento de dados durante as transferências de concessão, ela não elimina totalmente essa possibilidade. Para preservar a integridade e a consistência dos dados, projete suas aplicações de consumo downstream como idempotentes. Isso significa que elas devem ser capazes de lidar com o possível processamento de registros duplicados sem efeitos adversos no sistema geral.

```
/**
 * Invoked when either Scheduler has been requested to gracefully shutdown
 * or lease ownership is being transferred gracefully so the current owner
 * gets one last chance to checkpoint.
 *
 * Checkpoints and logs the data a final time.
 *
 * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint
 *                               before the shutdown is completed.
 */
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
    try {
       // Ensure that all delivered records are processed 
       // and has been successfully flushed to the downstream before calling 
       // checkpoint
       // If you are performing any asynchronous processing or flushing to
       // downstream, you must wait for its completion before invoking
       // the below checkpoint method.
        log.info("Scheduler is shutting down, checkpointing.");
        shutdownRequestedInput.checkpointer().checkpoint();
    } catch (ShutdownException | InvalidStateException e) {
        log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
    } 
}
```

## Etapa 5: verifique os pré-requisitos da KCL 3.x para coletar métricas de operadores
<a name="kcl-migration-from-2-3-worker-metrics"></a>

A KCL 3.x coleta métricas de utilização da CPU, como a utilização da CPU pelos operadores, para equilibrar uniformemente a carga entre os operadores. Os operadores de aplicações de consumo podem ser executados em Amazon EC2, Amazon ECS, Amazon EKS ou AWS Fargate. A KCL 3.x pode coletar métricas de utilização da CPU dos operadores somente quando os seguintes pré-requisitos forem atendidos:

 **Amazon Elastic Compute Cloud(Amazon EC2)**
+ Seu sistema operacional deve ser Linux OS.
+ Você deve habilitar [IMDSv2](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html)em sua instância do EC2.

 **Amazon Elastic Container Service (Amazon ECS) no Amazon EC2**
+ Seu sistema operacional deve ser Linux OS.
+ É necessário ativar o [endpoint de metadados de tarefas do ECS versão 4](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ec2-metadata.html). 
+ Sua versão do atendente do contêiner do Amazon ECS deve ser 1.39.0 ou posterior.

 **Amazon ECS em AWS Fargate**
+ É necessário habilitar o [endpoint de metadados de tarefas do Fargate versão 4](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4-fargate.html). Se você usar a versão da plataforma 1.4.0 ou posterior do Fargate, essa opção fica habilitada por padrão. 
+ Fargate versão da plataforma 1.4.0 ou posterior.

 **Amazon Elastic Kubernetes Service (Amazon EKS) no Amazon EC2** 
+ Seu sistema operacional deve ser Linux OS.

 **Amazon EKS em AWS Fargate**
+ Plataforma 1.3.0 ou posterior do Fargate.

**Importante**  
Se a KCL 3.x não puder coletar as métricas de utilização da CPU dos operadores porque os pré-requisitos não foram atendidos, ela reequilibrará a carga e o nível de throughput por concessão. Esse mecanismo de rebalanceamento alternativo garante que todos os operadores obtenham níveis de throughput total semelhantes das concessões atribuídas a cada operador. Para obter mais informações, consulte [Como a KCL atribui concessões aos operadores e equilibra a carga](kcl-dynamoDB.md#kcl-assign-leases).

## Etapa 6: atualizar as permissões do IAM para a KCL 3.x
<a name="kcl-migration-from-2-3-IAM-permissions"></a>

É necessário adicionar as permissões a seguir para o perfil do IAM ou a política associada à aplicação de consumo da KCL 3.x. Isso inclui a atualização da política do IAM atual, usada pela aplicação da KCL. Para obter mais informações, consulte [Permissões do IAM necessárias para aplicativos de consumo da KCL](kcl-iam-permissions.md).

**Importante**  
Seus aplicativos da KCL atuais podem não ter as seguintes ações e recursos do IAM adicionados à política do IAM porque não eram necessários na KCL 2.x. Verifique se você os adicionou antes de executar a aplicação da KCL 3.x.  
Ações: `UpdateTable`  
Recursos (ARNs): `arn:aws:dynamodb:region:account:table/KCLApplicationName`
Ações: `Query`  
Recursos (ARNs): `arn:aws:dynamodb:region:account:table/KCLApplicationName/index/*`
Ações: `CreateTable``DescribeTable`,,`Scan`,`GetItem`,`PutItem`,`UpdateItem`, `DeleteItem`  
Recursos (ARNs):`arn:aws:dynamodb:region:account:table/KCLApplicationName-WorkerMetricStats`, `arn:aws:dynamodb:region:account:table/KCLApplicationName-CoordinatorState`
Substitua “região”, “conta” e “KCLApplicationNome” no ARNs por seu próprio Região da AWS Conta da AWS número e nome do aplicativo KCL, respectivamente. Se você usar as configurações para personalizar os nomes das tabelas de metadados criadas pela KCL, use esses nomes de tabela especificados em vez do nome da aplicação da KCL.

## Etapa 7: implantar o código da KCL 3.x em seus operadores
<a name="kcl-migration-from-2-3-IAM-deploy"></a>

Depois de definir a configuração necessária para a migração e concluir todas as listas de verificação de migração anteriores, você pode criar e implantar seu código para seus operadores.

**nota**  
Se você ver um erro de compilação com o `LeaseManagementConfig` construtor, consulte Erro de [compilação com o LeaseManagementConfig construtor para obter informações sobre solução de problemas](https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#compilation-error-leasemanagementconfig).

## Etapa 8: concluir a migração
<a name="kcl-migration-from-2-3-finish"></a>

Durante a implantação do código da KCL 3.x, a KCL continua usando o algoritmo de atribuição de concessões da KCL 2.x. Após a implantação do código da KCL 3.x em todos os seus operadores, a KCL detectará isso automaticamente e passará a adotar o novo algoritmo de atribuição de concessões com base na utilização de recursos pelos operadores. Para obter mais detalhes sobre o novo algoritmo de atribuição de concessões, consulte [Como a KCL atribui concessões aos operadores e equilibra a carga](kcl-dynamoDB.md#kcl-assign-leases).

Durante a implantação, você pode monitorar o processo de migração com as seguintes métricas emitidas para o. CloudWatch É possível monitorar métricas na operação de `Migration`. Todas as métricas são per-KCL-application métricas e são definidas para o nível `SUMMARY` métrico. Se a estatística `Sum` da métrica `CurrentState:3xWorker` corresponder ao número total de operadores em sua aplicação da KCL, a migração para KCL 3.x foi concluída com êxito.

**Importante**  
 A KCL leva pelo menos 10 minutos para mudar para o novo algoritmo de atribuição de concessões depois que todos os operadores estiverem prontos para executá-lo.


**CloudWatch métricas para o processo de migração da KCL**  

| Metrics | Description | 
| --- | --- | 
| CurrentState:3xWorker |  Número de operadores da KCL migrados para a KCL 3.x e que executam o novo algoritmo de atribuição de concessões. Se a contagem `Sum` dessa métrica corresponder ao número total de seus operadores, a migração para a KCL 3.x foi concluída com êxito. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/pt_br/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| CurrentState:2xCompatibleWorker |  Número de operadores da KCL sendo executados no modo compatível com KCL 2.x durante o processo de migração. Um valor diferente de zero para essa métrica indica que a migração ainda está em andamento. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/pt_br/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| Fault |  Número de exceções encontradas durante o processo de migração. A maioria dessas exceções é composta por erros transitórios e a KCL 3.x tentará automaticamente concluir a migração novamente. Caso haja um valor métrico `Fault` persistente, revise seus logs do período de migração para solucionar o problema. Se o problema persistir, entre em contato Suporte. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/pt_br/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| GsiStatusReady |  Status da criação do índice secundário global (GSI) na tabela de concessões. Essa métrica indica se o GSI na tabela de concessões foi criado, pois isso é um pré-requisito para executar a KCL 3.x. O valor é 0 ou 1, com 1 indicando que a criação foi realizada. Durante um estado de reversão, essa métrica não será emitida. Depois de avançar novamente, você pode continuar a monitorar essa métrica. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/pt_br/streams/latest/dev/kcl-migration-from-2-3.html)  | 
| workerMetricsReady |  Status da emissão de métricas de todos os operadores. As métricas indicam se todos os operadores estão emitindo métricas como, por exemplo, a utilização da CPU. O valor é 0 ou 1, com 1 indicando que todos os operadores estão emitindo métricas e estão prontos para o novo algoritmo de atribuição de concessões. Durante um estado de reversão, essa métrica não será emitida. Depois de avançar novamente, você pode continuar a monitorar essa métrica. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/pt_br/streams/latest/dev/kcl-migration-from-2-3.html)  | 

O KCL fornece capacidade de reversão para o modo compatível com 2.x durante a migração. Depois que a migração para a KCL 3.x for concluída, recomendamos que você remova a configuração `CoordinatorConfig.clientVersionConfig` de `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` se a reversão não for mais necessária. A remoção dessa configuração interrompe a emissão de métricas relacionadas à migração da aplicação da KCL.

**nota**  
Recomendamos que você monitore o desempenho e a estabilidade da sua aplicação por um período durante a migração e após a conclusão da migração. Se houver um problema, você poderá fazer a reversão para que os operadores usem a funcionalidade compatível com a KCL 2.x por meio da [Ferramenta de migração da KCL.](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)

# Reverter para a versão anterior da KCL
<a name="kcl-migration-rollback"></a>

Este tópico explica as etapas para a reversão de seu consumidor para a versão anterior. Quando for preciso reverter, há um processo de duas etapas: 

1. Execute a [Ferramenta de Migração da KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py).

1. Reimplantar o código da versão anterior da KCL (opcional).

## Etapa 1: executar a Ferramenta de Migração da KCL
<a name="kcl-migration-rollback-tool"></a>

Quando precisar reverter para a versão anterior da KCL, você deve executar a Ferramenta de Migração da KCL. A Ferramenta de migração da KCL executa duas tarefas importantes:
+ Ela remove uma tabela de metadados chamada tabela de métricas do operador e o índice secundário global na tabela de concessões no DynamoDB. Esses dois artefatos são criados pela KCL 3.x, mas não são necessários quando você reverte para a versão anterior.
+ Isso faz com que todos os trabalhadores funcionem em um modo compatível com o KCL 2.x e comecem a usar o algoritmo de balanceamento de carga usado nas versões anteriores do KCL. Se você tiver problemas com o novo algoritmo de balanceamento de carga na KCL 3.x, isso mitigará o problema imediatamente.

**Importante**  
A tabela de estados do coordenador no DynamoDB deve existir e não deve ser excluída durante o processo de migração, reversão e avanço. 

**nota**  
É importante que todos os operadores em sua aplicação de consumidor usem o mesmo algoritmo de balanceamento de carga em um determinado momento. A Ferramenta de migração da KCL garante que todos os operadores em sua aplicação de consumo da KCL 3.x mudem para o modo compatível com a KCL 2.x para que todos os operadores executem o mesmo algoritmo de balanceamento de carga durante a reversão da aplicação para a versão anterior da KCL.

Você pode baixar a [Ferramenta de Migração KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py) no diretório de scripts do repositório [KCL GitHub](https://github.com/awslabs/amazon-kinesis-client/tree/master). O script pode ser executado em qualquer operador ou em qualquer host que tenha as permissões necessárias para gravar na tabela de estados do coordenador, excluir a tabela de métricas do operador e atualizar a tabela de concessões. Consulte [Permissões do IAM necessárias para aplicativos de consumo da KCL](kcl-iam-permissions.md) para obter a permissão necessária do IAM para executar o script. É necessário executar o script uma única vez para cada aplicação da KCL. É possível executar a Ferramenta de migração da KCL com o seguinte comando: 

```
python3 ./KclMigrationTool.py --region <region> --mode rollback [--application_name <applicationName>] [--lease_table_name <leaseTableName>] [--coordinator_state_table_name <coordinatorStateTableName>] [--worker_metrics_table_name <workerMetricsTableName>]
```

**Parâmetros**
+ --region: substitua `<region>` por sua. Região da AWS
+ --application\$1name: esse parâmetro é obrigatório se você estiver usando nomes padrão para suas tabelas de metadados do DynamoDB (tabela de concessões, tabela de estados do coordenador e tabela de métricas do operador). Se você tiver especificado nomes personalizados para essas tabelas, poderá omitir esse parâmetro. Substitua `<applicationName>` pelo nome real da aplicação da KCL. A ferramenta usa esse nome para obter os nomes de tabela padrão se os nomes personalizados não forem fornecidos.
+ --lease\$1table\$1name (opcional): esse parâmetro é necessário quando você define um nome personalizado para a tabela de concessões na configuração da KCL. Se você estiver usando o nome padrão da tabela, poderá omitir esse parâmetro. Substitua `leaseTableName` pelo nome da tabela personalizada que você especificou para a tabela de concessões.
+ --coordinator\$1state\$1table\$1name (opcional): esse parâmetro é necessário quando você define um nome personalizado para a tabela de estados do coordenador na configuração da KCL. Se você estiver usando o nome padrão da tabela, poderá omitir esse parâmetro. Substitua `<coordinatorStateTableName>` pelo nome da tabela personalizada que você especificou para a tabela de estados do coordenador. 
+ --worker\$1metrics\$1table\$1name (opcional): esse parâmetro é necessário quando você define um nome personalizado para a tabela de métricas do operador na configuração da KCL. Se você estiver usando o nome padrão da tabela, poderá omitir esse parâmetro. Substitua `<workerMetricsTableName>` pelo nome da tabela personalizada que você especificou para a tabela de métricas do operador. 

## Etapa 2: reimplantar o código com a versão anterior da KCL (opcional)
<a name="kcl-migration-rollback-redeploy"></a>

 Depois de executar a Ferramenta de Migração da KCL para uma reversão, você verá uma destas mensagens:
+ **Mensagem 1:** “Reversão concluída. Sua aplicação da KCL estava executando o modo compatível com KCL 2.x. Se não houver mitigação de nenhuma regressão, reverta para os binários anteriores da aplicação implantando o código com sua versão anterior da KCL.”
  + **Ação necessária:** Isso significa que seus trabalhadores estavam executando no modo compatível com KCL 2.x. Se o problema persistir, reimplante o código com a versão anterior da KCL para seus operadores.
+ **Mensagem 2: **“Reversão concluída. Sua aplicação da KCL estava executando o modo de funcionalidade da KCL 3.x. A reversão para os binários anteriores da aplicação não é necessária, a menos que você não veja nenhuma mitigação do problema em 5 minutos. Se o problema persistir, reverta para os binários anteriores da aplicação implantando o código com sua versão anterior da KCL.”
  + **Ação necessária:** Isso significa que seus trabalhadores estavam executando no modo KCL 3.x e a Ferramenta de Migração KCL mudou todos os trabalhadores para o modo compatível com KCL 2.x. Se o problema for resolvido, você não precisa reimplantar o código com a versão anterior da KCL. Se o problema persistir, reimplante o código com a versão anterior da KCL para seus operadores.

 

# Avançar para a KCL 3.x após uma reversão
<a name="kcl-migration-rollforward"></a>

Este tópico explica como voltar seu consumidor para a KCL 3.x após uma reversão. Quando precisar avançar, você deve passar por um processo de duas etapas: 

1. Execute a [Ferramenta de Migração da KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py). 

1. Implantar o código com a KCL 3.x.

## Etapa 1: executar a Ferramenta de Migração da KCL
<a name="kcl-migration-rollback-tool"></a>

Execute a Ferramenta de Migração da KCL. Ferramenta de migração da KCL com o seguinte comando para avançar para a KCL 3.x:

```
python3 ./KclMigrationTool.py --region <region> --mode rollforward [--application_name <applicationName>] [--coordinator_state_table_name <coordinatorStateTableName>]
```

**Parâmetros**
+ --region: substitua `<region>` por sua. Região da AWS
+ --application\$1name: esse parâmetro é obrigatório se você estiver usando nomes padrão para a tabela de estados do coordenador. Se você tiver especificado nomes personalizados para a tabela de estados do coordenador, poderá omitir esse parâmetro. Substitua `<applicationName>` pelo nome real da aplicação da KCL. A ferramenta usa esse nome para obter os nomes de tabela padrão se os nomes personalizados não forem fornecidos.
+ --coordinator\$1state\$1table\$1name (opcional): esse parâmetro é necessário quando você define um nome personalizado para a tabela de estados do coordenador na configuração da KCL. Se você estiver usando o nome padrão da tabela, poderá omitir esse parâmetro. Substitua `<coordinatorStateTableName>` pelo nome da tabela personalizada que você especificou para a tabela de estados do coordenador. 

Após a execução da Ferramenta de Migração no modo de avanço, o KCL cria os seguintes recursos do DynamoDB necessários para a KCL 3.x:
+ Um índice secundário global na tabela de concessões
+ Uma tabela de métricas do operador

## Etapa 2: implantar o código com a KCL 3.x
<a name="kcl-migration-rollback-redeploy"></a>

Depois de executar a Ferramenta de Migração da KCL para um avanço, implante seu código com a KCL 3.x nos operadores. Siga [Etapa 8: concluir a migração](kcl-migration-from-2-3.md#kcl-migration-from-2-3-finish) para concluir sua migração.

# Práticas recomendadas para a tabela de concessões com modo de capacidade provisionada
<a name="kcl-migration-lease-table"></a>

Se a tabela de concessões da sua aplicação da KCL foi alterada para o modo de capacidade provisionada, a KCL 3.x cria um índice secundário global na tabela de concessões com o modo de cobrança provisionada e as mesmas unidades de capacidade de leitura (RCU) e de gravação (WCU) como tabela básica de concessões. Quando o índice secundário global for criado, recomendamos que você monitore o uso real no índice secundário global, no console do DynamoDB, e ajuste as unidades de capacidade, se necessário. Para obter um guia mais detalhado sobre como alternar o modo de capacidade das tabelas de metadados do DynamoDB criadas pela KCL, consulte [Modo de capacidade do DynamoDB para tabelas de metadados criadas pela KCL](kcl-dynamoDB.md#kcl-capacity-mode). 

**nota**  
Por padrão, a KCL cria tabelas de metadados, como a tabela de concessões, a tabela de métricas do operador e a tabela de estados do coordenador, além do índice secundário global na tabela de concessões usando o modo de capacidade sob demanda. Recomendamos que você use o modo de capacidade sob demanda para ajustar automaticamente a capacidade com base nas alterações de uso. 

# Migrar da KCL 1.x para a KCL 3.x
<a name="kcl-migration-1-3"></a>

Este tópico explica como migrar seu consumidor da KCL 1.x para a KCL 3.x. A KCL 1.x usa classes e interfaces diferentes em comparação com a KCL 2.x e a KCL 3.x. É necessário primeiro migrar o processador de registros, a fábrica do processador de registros e as classes de operador para o formato compatível com a KCL 2.x/3.x e depois seguir as etapas de migração da KCL 2.x para a KCL 3.x. É possível atualizar diretamente da KCL 1.x para a KCL 3.x.
+ **Etapa 1: migrar o processador de registros**

  Siga a seção [Migrar o processador de registros](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration) na página [Migrar consumidores da KCL 1.x para a KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration).
+ **Etapa 2: migrar a fábrica do processador de discos**

  Siga a seção [Migrar a fábrica do processador de registros](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-factory-migration) na página [Migrar consumidores da KCL 1.x para a KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration).
+ **Etapa 3: migrar o trabalhador**

  Siga a seção [Migrar o operador](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#worker-migration) na página [Migrar consumidores da KCL 1.x para a KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration).
+ **Etapa 4: migrar a configuração da KCL 1.x**

  Siga a seção [Configurar o cliente Amazon Kinesis](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#client-configuration) na página [Migrar consumidores da KCL 1.x para a KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration).
+ **Etapa 5: verifique a remoção do tempo de inatividade e as remoções da configuração do cliente**

  Siga as seções [Remoção do tempo de inatividade](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#idle-time-removal) e [Remoções da configuração do cliente](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#client-configuration-removals) na página [Migrar consumidores da KCL 1.x para a KCL 2.x](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#recrod-processor-migration).
+ **Etapa 6: Siga as step-by-step instruções no guia de migração do KCL 2.x para o KCL 3.x**

  Siga as instruções na página [Migrar da KCL 2.x para a KCL 3.x](kcl-migration-from-2-3.md) para concluir a migração. Se precisar reverter para a versão anterior da KCL ou passar para a KCL 3.x após uma reversão, consulte [Reverter para a versão anterior da KCL](kcl-migration-rollback.md) e [Avançar para a KCL 3.x após uma reversão](kcl-migration-rollforward.md).

**Importante**  
Não use as AWS SDK para Java versões 2.27.19 a 2.27.23 com KCL 3.x. Essas versões têm um problema que causa um erro de exceção relacionado ao uso do DynamoDB da KCL. Recomendamos que você use a AWS SDK para Java versão 2.28.0 ou posterior para evitar esse problema. 

# Documentação da versão anterior da KCL
<a name="kcl-archive"></a>

Os tópicos a seguir foram arquivados. Para ver a documentação atual da Kinesis Client Library, consulte [Usar a Kinesis Client Library](kcl.md).

**Importante**  
As versões 1.x e 2.x da Amazon Kinesis Client Library (KCL) estão desatualizadas. O KCL 1.x chegará end-of-support em 30 de janeiro de 2026. É **altamente recomendável** que você migre suas aplicações da KCL que usam a versão 1.x para a versão mais recente da KCL antes de 30 de janeiro de 2026. Para encontrar a versão mais recente da KCL, consulte a página da [biblioteca de cliente do Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) em. GitHub Para obter informações sobre as versões mais recentes da KCL, consulte [Usar a Kinesis Client Library](kcl.md). Para ter mais informações sobre como migrar da KCL 1.x para a KCL 3.x, consulte [Migrar da KCL 1.x para a KCL 3.x](kcl-migration-1-3.md).

**Topics**
+ [Informações da KCL 1.x e 2.x](shared-throughput-kcl-consumers.md)
+ [Desenvolver consumidores personalizados com throughput compartilhada](shared-throughput-consumers.md)
+ [Migrar consumidores da KCL 1.x para a KCL 2.x](kcl-migration.md)

# Informações da KCL 1.x e 2.x
<a name="shared-throughput-kcl-consumers"></a>

**Importante**  
As versões 1.x e 2.x da Amazon Kinesis Client Library (KCL) estão desatualizadas. O KCL 1.x chegará end-of-support em 30 de janeiro de 2026. É **altamente recomendável** que você migre suas aplicações da KCL que usam a versão 1.x para a versão mais recente da KCL antes de 30 de janeiro de 2026. Para encontrar a versão mais recente da KCL, consulte a página da [biblioteca de cliente do Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) em. GitHub Para obter informações sobre as versões mais recentes da KCL, consulte [Usar a Kinesis Client Library](kcl.md). Para ter mais informações sobre como migrar da KCL 1.x para a KCL 3.x, consulte [Migrar da KCL 1.x para a KCL 3.x](kcl-migration-1-3.md).

Um dos métodos de desenvolvimento de aplicações de consumo personalizadas que podem processar dados de fluxos de dados do KDS é usar a Kinesis Client Library (KCL).

**Topics**
+ [Sobre a KCL (versões anteriores)](#shared-throughput-kcl-consumers-overview)
+ [Versões anteriores da KCL](#shared-throughput-kcl-consumers-versions)
+ [Conceitos da KCL (versões anteriores)](#shared-throughput-kcl-consumers-concepts)
+ [Usar uma tabela de concessões para monitorar os fragmentos processados pela aplicação de consumo da KCL](#shared-throughput-kcl-consumers-leasetable)
+ [Processar vários fluxos de dados com a mesma aplicação de consumo da KCL 2.x para Java](#shared-throughput-kcl-multistream)
+ [Use o KCL com o Registro do AWS Glue Esquema](#shared-throughput-kcl-consumers-glue-schema-registry)

**nota**  
Recomenda-se o uso da versão mais recente da KCL 1.x ou da KCL 2.x, dependendo do cenário de uso. Ambas as versões da KCL, tanto 1.x como a 2.x, são atualizadas regularmente para incluir os patches de dependência e segurança e as correções de bugs mais recentes, além de novos recursos compatíveis com versões anteriores. Para obter mais informações, consulte [https://github.com/awslabs/amazon-kinesis-client/releases](https://github.com/awslabs/amazon-kinesis-client/releases).

## Sobre a KCL (versões anteriores)
<a name="shared-throughput-kcl-consumers-overview"></a>

A KCL ajuda você a consumir e processar dados de um fluxo de dados do Kinesis lidando com muitas das tarefas complexas associadas à computação distribuída. Isso inclui balanceamento de carga em várias instâncias de aplicações de consumo, resposta a falhas nas instâncias de aplicações de clientes, verificação de registros processados e reação à refragmentação. A KCL cuida de todas essas subtarefas para possibilitar a concetração de esforços na escrita de uma lógica personalizada de processamento de registros.

O KCL é diferente dos Kinesis Data APIs Streams que estão disponíveis no. AWS SDKs O Kinesis APIs Data Streams ajuda você a gerenciar muitos aspectos do Kinesis Data Streams, incluindo a criação de streams, a refragmentação e a colocação e obtenção de registros. A KCL fornece uma camada de abstração em torno de todas essas subtarefas, especificamente para possibilitar a concentração na lógica de processamento de dados personalizada da aplicação de consumo. Para obter informações sobre a API do Kinesis Data Streams, consulte a [Referência de APIs do Amazon Kinesis](https://docs.aws.amazon.com/kinesis/latest/APIReference/Welcome.html).

**Importante**  
A KCL é uma biblioteca Java. O suporte para linguagens diferentes de Java é fornecido usando uma interface multilíngue chamada MultiLangDaemon. Esse daemon baseado em Java é executado em segundo plano quando uma linguagem de KCL diferente de Java é utilizada. Por exemplo, se você instalar o KCL para Python e escrever seu aplicativo de consumidor inteiramente em Python, você ainda precisará do Java instalado em seu sistema por causa do. MultiLangDaemon Além disso, MultiLangDaemon tem algumas configurações padrão que talvez você precise personalizar para seu caso de uso, por exemplo, a AWS região à qual ele se conecta. Para obter mais informações sobre o MultiLangDaemon on GitHub, consulte o [ MultiLangDaemon projeto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

A KCL atua como um intermediário entre a lógica de processamento de registros e o Kinesis Data Streams. 

## Versões anteriores da KCL
<a name="shared-throughput-kcl-consumers-versions"></a>

Atualmente, é possível usar qualquer uma destas versões compatíveis da KCL para criar aplicações de consumo personalizadas:
+ **KCL 1.x**

  Para obter mais informações, consulte [Desenvolver aplicações de consumo da KCL 1.x](developing-consumers-with-kcl.md).
+ **KCL 2.x**

  Para obter mais informações, consulte [Desenvolver aplicações de consumo da KCL 2.x](developing-consumers-with-kcl-v2.md).

Pode-se usar a KCL 1.x ou a KCL 2.x para criar aplicações de consumo que usam throughput compartilhada. Para obter mais informações, consulte [Desenvolver aplicações de consumo personalizadas com throughput compartilhada usando a KCL](custom-kcl-consumers.md).

Para criar aplicações de consumo que usam throughput dedicada (consumidores de distribuição avançada), só é possível usar a KCL 2.x. Para obter mais informações, consulte [Desenvolver consumidores de distribuição avançada com throughput dedicado](enhanced-consumers.md).

Para obter informações sobre as diferenças entre a KCL 1.x e a KCL 2.x e instruções sobre como migrar da KCL 1.x para a KCL 2.x, consulte [Migrar consumidores da KCL 1.x para a KCL 2.x](kcl-migration.md).

## Conceitos da KCL (versões anteriores)
<a name="shared-throughput-kcl-consumers-concepts"></a>
+ **Aplicação de consumo da KCL**: uma aplicação personalizada que usa a KCL e é projetada para ler e processar registros de fluxos de dados. 
+ **Instância de aplicação de consumo**: as aplicações de cliente da KCL normalmente são distribuídas, com uma ou mais instâncias executadas simultaneamente para coordenar falhas e balancear dinamicamente a carga de processamento dos registros de dados.
+ **Operador**: uma classe de alto nível que uma instância de aplicação de consumo da KCL usa para começar a processar dados. 
**Importante**  
Cada instância da aplicação de consumo da KCL tem um operador. 

  O operador inicializa e supervisiona várias tarefas, incluindo a sincronização de informações de fragmentos e concessões, o monitoramento de atribuições de fragmentos e o processamento dos dados dos fragmentos. Um trabalhador fornece à KCL as informações de configuração do aplicativo consumidor, como o nome do fluxo de dados cujos registros de dados esse aplicativo consumidor KCL processará e AWS as credenciais necessárias para acessar esse fluxo de dados. O operador também inicia a instância específica da aplicação de consumo da KCL para entregar registros de dados do fluxo de dados aos processadores de registros.
**Importante**  
Na KCL 1.x, essa classe é chamada de **operador**. Para obter mais informações (esses são os repositórios Java KCL), consulte [https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker](https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java).java. Na KCL 2.x, essa classe é chamada de **Agendador**. A finalidade do Agendador na KCL 2.x é idêntica à finalidade do operador na KCL 1.x. [Para obter mais informações sobre a classe Scheduler no KCL 2.x, consulte https://github.com/awslabs/amazon-kinesis-client/.java. blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java) 
+ **Concessão**: dado que define a ligação entre um operador e um fragmento. As aplicações de cliente distribuídas da KCL usam concessões para particionar o processamento de registros de dados em uma frota de operadores. A qualquer momento, cada fragmento de registros de dados é associado a um determinado operador por uma concessão identificada pela variável **leaseKey**. 

  Por padrão, um trabalhador pode manter um ou mais arrendamentos (sujeito ao valor da variável **maxLeasesForWorker**) ao mesmo tempo. 
**Importante**  
Os operadores competem para manter todas as concessões disponíveis para todos os fragmentos disponíveis em um fluxo de dados. Mas apenas um operador consegue manter uma concessão de cada vez. 

  Por exemplo, se houver uma instância da aplicação de consumo A com o operador A que está processando um fluxo de dados com quatro fragmentos, o operador A poderá reter as concessões aos fragmentos 1, 2, 3 e 4 ao mesmo tempo. Mas, se houver duas instâncias de aplicações de consumo A e B com os operadores A e B, e essas instâncias estiverem processando um fluxo de dados com quatro fragmentos, o operador A e o operador B não poderão reter a concessão ao fragmento 1 ao mesmo tempo. Um operador retém a concessão a um fragmento específico até estar pronto para parar de processar os registros de dados do fragmento ou até que uma falha ocorra. Quando um operador libera a concessão, outro operador a assume e a retém. 

  Para obter mais informações (esses são os repositórios Java KCL), consulte [https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java para KCL 1.x e [https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java).java](https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java) para KCL 2.x.
+ **Tabela de concessões**: uma tabela exclusiva do Amazon DynamoDB usada para monitorar os fragmentos em um fluxo de dados do KDS vinculados a uma concessão e sendo processados pelos operadores da aplicação de consumo da KCL. A tabela de concessões precisa permanecer sincronizada (em um operador e entre todos os operadores) com as informações mais recentes do fragmento do fluxo de dados enquanto a aplicação de consumo da KCL está em execução. Para obter mais informações, consulte [Usar uma tabela de concessões para monitorar os fragmentos processados pela aplicação de consumo da KCL](#shared-throughput-kcl-consumers-leasetable).
+ **Processador de registros**: a lógica que define como a aplicação de consumo da KCL processa os dados obtidos dos fluxos de dados. Em runtime, uma instância da aplicação de consumo da KCL inicia um operador, que, por sua vez, inicia um processador de registros para cada fragmento cuja concessão retém. 

## Usar uma tabela de concessões para monitorar os fragmentos processados pela aplicação de consumo da KCL
<a name="shared-throughput-kcl-consumers-leasetable"></a>

**Topics**
+ [O que é uma tabela de concessões](#shared-throughput-kcl-consumers-what-is-leasetable)
+ [Throughput](#shared-throughput-kcl-leasetable-throughput)
+ [Como a tabela de concessões é sincronizada com fragmentos em um fluxo de dados do KDS](#shared-throughput-kcl-consumers-leasetable-sync)

### O que é uma tabela de concessões
<a name="shared-throughput-kcl-consumers-what-is-leasetable"></a>

Em cada aplicação do Amazon Kinesis Data Streams, a KCL usa uma tabela de concessões exclusiva (armazenada em uma tabela do Amazon DynamoDB) para monitorar os fragmentos em um fluxo de dados do KDS vinculados a uma concessão e sendo processados pelos operadores da aplicação de consumo da KCL.

**Importante**  
Como a KCL usa o nome da aplicação de consumo para criar o nome da tabela de concessões que a aplicação usa, cada aplicação de consumo deve ter um nome exclusivo.

É possível visualizar a tabela usando o [console do Amazon DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ConsoleDynamoDB.html) enquanto a aplicação de consumo está em execução.

Se a tabela de concessões da aplicação de consumo da KCL não existir quando a aplicação for inicializada, um dos operadores a criará. 

**Importante**  
 Sua conta é cobrada pelos custos associados à tabela do DynamoDB, além dos custos associados ao próprio Kinesis Data Streams. 

Cada linha na tabela de concessões representa um fragmento que está sendo processado pelos operadores da aplicação de consumo. Se a aplicação de consumo da KCL processar somente um fluxo de dados, a chave de hash da tabela de concessões, `leaseKey`, será o ID do fragmento. Em caso de [Processar vários fluxos de dados com a mesma aplicação de consumo da KCL 2.x para Java](#shared-throughput-kcl-multistream), a estrutura da leaseKey será semelhante a `account-id:StreamName:streamCreationTimestamp:ShardId`. Por exemplo, .`111111111:multiStreamTest-1:12345:shardId-000000000336`

Além do ID do fragmento, cada linha também inclui os seguintes dados:
+ **checkpoint:** número de sequência do ponto de verificação mais recente do fragmento. Esse valor é exclusivo entre todos os fragmentos no fluxo de dados.
+ **checkpointSubSequenceNúmero:** ao usar o recurso de agregação da Kinesis Producer Library, essa é uma extensão do **ponto de verificação** que rastreia registros individuais de usuários dentro do registro do Kinesis.
+ **leaseCounter:** usado para versionamento de concessão, para que os operadores possam detectar se a própria concessão foi assumida por outro operador.
+ **leaseKey:** um identificador exclusivo para uma concessão. Cada concessão é específica a um fragmento no fluxo de dados e é retida por um operador por vez.
+ **leaseOwner:** o operador que está retendo essa concessão.
+ **ownerSwitchesSincePonto de verificação:** Quantas vezes esse contrato de arrendamento mudou de trabalhador desde a última vez que um posto de controle foi escrito.
+ **parentShardId:** usado para garantir que o fragmento principal seja totalmente processado antes do início do processamento nos fragmentos secundários. Isso garante que os registros sejam processados na mesma ordem em que foram colocados no fluxo.
+ **hashrange:** usado pelo `PeriodicShardSyncManager` para executar sincronizações periódicas a fim de encontrar fragmentos ausentes na tabela de concessões e criar concessões para eles, se necessário. 
**nota**  
A partir da KCL 1.14 e da KCL 2.3, esse dado está presente na tabela de concessões de cada fragmento. Para obter mais informações sobre `PeriodicShardSyncManager` e a sincronização periódica entre concessões e fragmentos, consulte [Como a tabela de concessões é sincronizada com fragmentos em um fluxo de dados do KDS](#shared-throughput-kcl-consumers-leasetable-sync).
+ **childshards:** usado por `LeaseCleanupManager` para revisar o status de processamento do fragmento filho e decidir se o fragmento pai pode ser excluído da tabela de concessões.
**nota**  
A partir da KCL 1.14 e da KCL 2.3, esse dado está presente na tabela de concessões de cada fragmento.
+ **shardID:** o ID do fragmento.
**nota**  
Esse dado só estará presente na tabela de concessões se você estiver [Processar vários fluxos de dados com a mesma aplicação de consumo da KCL 2.x para Java](#shared-throughput-kcl-multistream). Isso só tem suporte na KCL 2.x para Java, a partir da KCL 2.3 para Java e versões posteriores. 
+ **stream name** o identificador do fluxo de dados no formato `account-id:StreamName:streamCreationTimestamp`.
**nota**  
Esse dado só estará presente na tabela de concessões se você estiver [Processar vários fluxos de dados com a mesma aplicação de consumo da KCL 2.x para Java](#shared-throughput-kcl-multistream). Isso só tem suporte na KCL 2.x para Java, a partir da KCL 2.3 para Java e versões posteriores. 

### Throughput
<a name="shared-throughput-kcl-leasetable-throughput"></a>

Se sua aplicação do Amazon Kinesis Data Streams receber exceções de throughput provisionado, é necessário aumentar o throughput provisionado para a tabela do DynamoDB. A KCL cria a tabela com um throughput provisionado de 10 leituras por segundo e 10 gravações por segundo, mas isso pode não ser suficiente para a aplicação. Por exemplo, se uma aplicação do Amazon Kinesis Data Streams definir pontos de verificação ou usar operadores com frequência em um fluxo de dados composto por vários fragmentos, talvez seja necessário um throughput maior.

Para obter informações sobre o throughput provisionado no DynamoDB, consulte [Modo de capacidade de leitura/gravação](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html) e [Trabalhar com tabelas e dados no DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithDDTables.html) no *Guia do desenvolvedor do Amazon DynamoDB*.

### Como a tabela de concessões é sincronizada com fragmentos em um fluxo de dados do KDS
<a name="shared-throughput-kcl-consumers-leasetable-sync"></a>

Os operadores das aplicações de consumo da KCL usam concessões para processar fragmentos de um determinado fluxo de dados. As informações sobre qual operador usa a concessão a um fragmento em um momento determinado são armazenadas em uma tabela de concessões. A tabela de concessões precisa permanecer sincronizada com as informações mais recentes do fragmento do fluxo de dados enquanto a aplicação de consumo da KCL está em execução. A KCL sincroniza a tabela de concessões com as informações de fragmentos adquiridas do serviço Kinesis Data Streams durante a inicialização ou o reinício da aplicação de consumo e sempre que um fragmento sendo processado chega ao fim (refragmentação). Em outras palavras, os operadores ou uma aplicação de consumo da KCL são sincronizados com o fluxo de dados que estão processando durante a inicialização da aplicação e sempre que a aplicação encontra um evento de refragmentação do fluxo de dados.

**Topics**
+ [Sincronização na KCL 1.0 - 1.13 e na KCL 2.0 - 2.2](#shared-throughput-kcl-consumers-leasetable-sync-old)
+ [Sincronização na KCL 2.x a partir da KCL 2.3 e em versões posteriores](#shared-throughput-kcl-consumers-leasetable-sync-new-kcl2)
+ [Sincronização na KCL 1.x a partir da KCL 1.14 e em versões posteriores](#shared-throughput-kcl-consumers-leasetable-sync-new-kcl1)

#### Sincronização na KCL 1.0 - 1.13 e na KCL 2.0 - 2.2
<a name="shared-throughput-kcl-consumers-leasetable-sync-old"></a>

No KCL 1.0 - 1.13 e no KCL 2.0 - 2.2, durante a inicialização do aplicativo consumidor e também durante cada evento de refragmentação do fluxo de dados, o KCL sincroniza a tabela de leasing com as informações de fragmentos adquiridas do serviço Kinesis Data Streams invocando a ou a descoberta. `ListShards` `DescribeStream` APIs Em todas as versões do KCL listadas acima, cada trabalhador de um aplicativo consumidor do KCL conclui as seguintes etapas para realizar o processo de lease/shard sincronização durante a inicialização do aplicativo consumidor e em cada evento de refragmentação do stream:
+ Busca todos os fragmentos de dados do fluxo sendo processado
+ Busca todas as concessões do fragmento da tabela de concessões
+ Filtra cada fragmento aberto sem uma concessão na tabela de concessões
+ Itera em todos os fragmentos abertos encontrados e, para cada fragmento aberto sem pai aberto:
  + Percorre a árvore hierárquica no caminho dos ancestrais para determinar se o fragmento é um descendente. Um fragmento será considerado descendente se um fragmento ancestral estiver sendo processado (a entrada de concessão do fragmento ancestral existe na tabela de concessões) ou se houver um fragmento ancestral que deve ser processado (por exemplo, a posição inicial é `TRIM_HORIZON` ou `AT_TIMESTAMP`).
  + Se o fragmento aberto for descendente, a KCL verificará sua posição inicial e criará concessões para seus pais, se necessário.

#### Sincronização na KCL 2.x a partir da KCL 2.3 e em versões posteriores
<a name="shared-throughput-kcl-consumers-leasetable-sync-new-kcl2"></a>

A partir das versões mais recentes compatíveis da KCL 2.x (KCL 2.3) e posteriores, a biblioteca oferece suporte às alterações no processo de sincronização listadas a seguir. Essas alterações de lease/shard sincronização reduzem significativamente o número de chamadas de API feitas pelos aplicativos consumidores da KCL para o serviço Kinesis Data Streams e otimizam o gerenciamento de leasing em seu aplicativo consumidor da KCL. 
+ Na inicialização da aplicação, se a tabela de concessões estiver vazia, a KCL utilizará a opção de filtragem da API `ListShard` (o parâmetro de solicitação `ShardFilter` opcional) para recuperar e criar concessões somente para um instantâneo dos fragmentos abertos no momento especificado pelo parâmetro `ShardFilter`. O parâmetro `ShardFilter` permite filtrar a resposta da API `ListShards`. A única propriedade obrigatória do parâmetro `ShardFilter` é `Type`. A KCL usa a propriedade de filtro `Type` e os seguintes valores válidos para identificar e retornar um instantâneo dos fragmentos abertos que podem exigir novas concessões:
  + `AT_TRIM_HORIZON`: a resposta inclui todos os fragmentos abertos em`TRIM_HORIZON`. 
  + `AT_LATEST`: a resposta inclui somente os fragmentos do fluxo de dados abertos no momento. 
  + `AT_TIMESTAMP`: a resposta inclui todos os fragmentos com timestamp inicial menor ou igual ao timestamp fornecido e timestamp final maior ou igual ao timestamp fornecido ou ainda abertos.

  `ShardFilter` é usado ao criar uma concessão em uma tabela de concessões vazia para inicializar concessões para um instantâneo dos fragmentos especificados em `RetrievalConfig#initialPositionInStreamExtended`.

  Para saber mais sobre o `ShardFilter`, consulte [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html).
+ Em vez de todos os trabalhadores realizarem a lease/shard sincronização para manter a tabela de leasing atualizada com os fragmentos mais recentes no fluxo de dados, um único líder de trabalhadores eleito executa a sincronização de arrendamento/fragmento.
+ O KCL 2.3 usa o parâmetro de `ChildShards` retorno do `GetRecords` e do `SubscribeToShard` APIs para realizar a lease/shard sincronização que acontece em `SHARD_END` para fragmentos fechados, permitindo que um trabalhador do KCL crie concessões somente para os fragmentos secundários do fragmento que ele concluiu o processamento. Para aplicativos compartilhados em todos os consumidores, essa otimização da lease/shard sincronização usa o `ChildShards` parâmetro da `GetRecords` API. Para aplicativos de consumo de taxa de transferência dedicada (fan-out aprimorado), essa otimização da lease/shard sincronização usa o `ChildShards` parâmetro da API. `SubscribeToShard` Para obter mais informações, consulte [GetRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html), [SubscribeToShards](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html) e [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html).
+ Com as mudanças acima, o comportamento da KCL está passando de um modelo no qual todos os operadores obtêm informações de todos os fragmentos existentes para um modelo em que os operadores só obtêm informações dos filhos do fragmento que possui. Portanto, além da sincronização que ocorre durante a inicialização do aplicativo do consumidor e os eventos de refragmentação, a KCL agora também realiza shard/lease varreduras periódicas adicionais para identificar possíveis falhas na tabela de leasing (em outras palavras, para conhecer todos os novos fragmentos) para garantir que o intervalo de hash completo do fluxo de dados esteja sendo processado e criar concessões para eles, se necessário. `PeriodicShardSyncManager`é o componente responsável pela execução de lease/shard varreduras periódicas. 

  Para obter mais informações sobre o `PeriodicShardSyncManager` KCL 2.3, consulte [https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java \$1L201](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java#L201-L213) -L213.

  A KCL 2.3 tem novas opções disponíveis para configuração de `PeriodicShardSyncManager` em `LeaseManagementConfig`:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/pt_br/streams/latest/dev/shared-throughput-kcl-consumers.html)

  Agora, novas CloudWatch métricas também são emitidas para monitorar a integridade do`PeriodicShardSyncManager`. Para obter mais informações, consulte [PeriodicShardSyncManager](monitoring-with-kcl.md#periodic-task).
+ Inclui uma otimização de `HierarchicalShardSyncer` para criar apenas concessões em uma camada de fragmentos.

#### Sincronização na KCL 1.x a partir da KCL 1.14 e em versões posteriores
<a name="shared-throughput-kcl-consumers-leasetable-sync-new-kcl1"></a>

A partir das versões mais recentes compatíveis da KCL 1.x (KCL 1.14) e posteriores, a biblioteca oferece suporte às alterações no processo de sincronização listadas a seguir. Essas alterações de lease/shard sincronização reduzem significativamente o número de chamadas de API feitas pelos aplicativos consumidores da KCL para o serviço Kinesis Data Streams e otimizam o gerenciamento de leasing em seu aplicativo consumidor da KCL. 
+ Na inicialização da aplicação, se a tabela de concessões estiver vazia, a KCL utilizará a opção de filtragem da API `ListShard` (o parâmetro de solicitação `ShardFilter` opcional) para recuperar e criar concessões somente para um instantâneo dos fragmentos abertos no momento especificado pelo parâmetro `ShardFilter`. O parâmetro `ShardFilter` permite filtrar a resposta da API `ListShards`. A única propriedade obrigatória do parâmetro `ShardFilter` é `Type`. A KCL usa a propriedade de filtro `Type` e os seguintes valores válidos para identificar e retornar um instantâneo dos fragmentos abertos que podem exigir novas concessões:
  + `AT_TRIM_HORIZON`: a resposta inclui todos os fragmentos abertos em`TRIM_HORIZON`. 
  + `AT_LATEST`: a resposta inclui somente os fragmentos do fluxo de dados abertos no momento. 
  + `AT_TIMESTAMP`: a resposta inclui todos os fragmentos com timestamp inicial menor ou igual ao timestamp fornecido e timestamp final maior ou igual ao timestamp fornecido ou ainda abertos.

  `ShardFilter` é usado ao criar uma concessão em uma tabela de concessões vazia para inicializar concessões para um instantâneo dos fragmentos especificados em `KinesisClientLibConfiguration#initialPositionInStreamExtended`.

  Para saber mais sobre o `ShardFilter`, consulte [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html).
+ Em vez de todos os trabalhadores realizarem a lease/shard sincronização para manter a tabela de leasing atualizada com os fragmentos mais recentes no fluxo de dados, um único líder de trabalhadores eleito executa a sincronização de arrendamento/fragmento.
+ O KCL 1.14 usa o parâmetro de `ChildShards` retorno do `GetRecords` e do `SubscribeToShard` APIs para realizar a lease/shard sincronização que acontece em `SHARD_END` para fragmentos fechados, permitindo que um trabalhador do KCL crie concessões somente para os fragmentos secundários do fragmento que ele concluiu o processamento. Para obter mais informações, consulte [GetRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html) e [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html).
+ Com as mudanças acima, o comportamento da KCL está passando de um modelo no qual todos os operadores obtêm informações de todos os fragmentos existentes para um modelo em que os operadores só obtêm informações dos filhos do fragmento que possui. Portanto, além da sincronização que ocorre durante a inicialização do aplicativo do consumidor e os eventos de refragmentação, a KCL agora também realiza shard/lease varreduras periódicas adicionais para identificar possíveis falhas na tabela de leasing (em outras palavras, para conhecer todos os novos fragmentos) para garantir que o intervalo de hash completo do fluxo de dados esteja sendo processado e criar concessões para eles, se necessário. `PeriodicShardSyncManager`é o componente responsável pela execução de lease/shard varreduras periódicas. 

  Quando `KinesisClientLibConfiguration#shardSyncStrategyType` é definido como `ShardSyncStrategyType.SHARD_END`, `PeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThreshold` é usado para determinar o limite do número de varreduras consecutivas contendo lacunas na tabela de concessões após o qual é necessário impor uma sincronização de fragmentos. Quando `KinesisClientLibConfiguration#shardSyncStrategyType` é definido como `ShardSyncStrategyType.PERIODIC`, `leasesRecoveryAuditorInconsistencyConfidenceThreshold` é ignorado.

  Para obter mais informações sobre o `PeriodicShardSyncManager` KCL 1.14, consulte [https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java](https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L987-L999) \$1L987 -L999.

  A KCL 1.14 tem uma nova opção disponível para configuração de `PeriodicShardSyncManager` em `LeaseManagementConfig`:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/pt_br/streams/latest/dev/shared-throughput-kcl-consumers.html)

  Agora, novas CloudWatch métricas também são emitidas para monitorar a integridade do`PeriodicShardSyncManager`. Para obter mais informações, consulte [PeriodicShardSyncManager](monitoring-with-kcl.md#periodic-task).
+ A KCL 1.14 agora também oferece suporte à limpeza adiada de concessões. As concessões são excluídas de forma assíncrona por `LeaseCleanupManager` ao chegar ao `SHARD_END` quando um fragmento ultrapassar o período de retenção do fluxo de dados ou quando for fechado por uma operação de refragmentação.

  Novas opções disponíveis para configuração de `LeaseCleanupManager`:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/pt_br/streams/latest/dev/shared-throughput-kcl-consumers.html)
+ Inclui uma otimização de `KinesisShardSyncer` para criar apenas concessões em uma camada de fragmentos.

## Processar vários fluxos de dados com a mesma aplicação de consumo da KCL 2.x para Java
<a name="shared-throughput-kcl-multistream"></a>

Esta seção descreve as seguintes alterações na KCL 2.x para Java que permitem criar aplicações de consumo da KCL que podem processar mais de um fluxo de dados ao mesmo tempo. 

**Importante**  
O processamento multifluxo só tem suporte na KCL 2.x para Java, a partir da KCL 2.3 para Java e versões posteriores.   
O processamento multifluxo NÃO tem suporte em nenhuma outra linguagem na qual a KCL 2.x possa ser implementada.  
O processamento multifluxo NÃO tem suporte em nenhuma versão da KCL 1.x.
+ **MultistreamTracker interface**

  Para criar um aplicativo de consumidor que possa processar vários fluxos ao mesmo tempo, você deve implementar uma nova interface chamada [MultistreamTracker](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java). Essa interface inclui o método `streamConfigList`, que retorna a lista de fluxos de dados, e suas configurações, a serem processados pela aplicação de consumo da KCL. Observe que os fluxos de dados sendo processados podem ser alterados durante o runtime da aplicação de consumo. `streamConfigList` é chamado periodicamente pela KCL para obter informações das mudanças nos fluxos de dados a serem processados.

  O `streamConfigList` método preenche a [StreamConfig](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java#L23)lista. 

  ```
  package software.amazon.kinesis.common;
  
  import lombok.Data;
  import lombok.experimental.Accessors;
  
  @Data
  @Accessors(fluent = true)
  public class StreamConfig {
      private final StreamIdentifier streamIdentifier;
      private final InitialPositionInStreamExtended initialPositionInStreamExtended;
      private String consumerArn;
  }
  ```

  Observe que os campos `StreamIdentifier` e `InitialPositionInStreamExtended` são obrigatórios, enquanto `consumerArn` é opcional. Só é necessário fornecer `consumerArn` se a KCL 2.x estiver sendo usada para implementar uma aplicação de consumo de distribuição avançada.

  Para obter mais informações sobre`StreamIdentifier`, consulte [https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java \$1L129](https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L129). Para criar um `StreamIdentifier`, recomenda-se a criação de uma instância multifluxo a partir do `streamArn` e do `streamCreationEpoch` que esteja disponível na v2.5.0 e versões posteriores. Na KCL v2.3 e v2.4, que não oferecem suporte ao `streamArm`, crie uma instância multifluxo usando o formato `account-id:StreamName:streamCreationTimestamp`. Esse formato será descontinuado e não terá mais suporte a partir da próxima versão principal.

  `MultistreamTracker` também inclui uma estratégia para excluir concessões de fluxos antigos na tabela de concessões (`formerStreamsLeasesDeletionStrategy`). Observe que a estratégia NÃO PODE ser alterada durante o runtime da aplicação de consumo. Para obter mais informações, consulte [https://github.com/awslabs/amazon-kinesis-clientb/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy/blob/0c5042dadf794fe988438436252a5a8fe70b6b0](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java) .java
+ [ConfigsBuilder](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java)é uma classe de todo o aplicativo que você pode usar para especificar todas as configurações do KCL 2.x a serem usadas ao criar seu aplicativo consumidor KCL. `ConfigsBuilder`a classe agora tem suporte para a `MultistreamTracker` interface. Você pode inicializar ConfigsBuilder com o nome do único fluxo de dados do qual consumir registros:

  ```
   /**
       * Constructor to initialize ConfigsBuilder with StreamName
       * @param streamName
       * @param applicationName
       * @param kinesisClient
       * @param dynamoDBClient
       * @param cloudWatchClient
       * @param workerIdentifier
       * @param shardRecordProcessorFactory
       */
      public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName,
              @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
              @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
              @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
          this.appStreamTracker = Either.right(streamName);
          this.applicationName = applicationName;
          this.kinesisClient = kinesisClient;
          this.dynamoDBClient = dynamoDBClient;
          this.cloudWatchClient = cloudWatchClient;
          this.workerIdentifier = workerIdentifier;
          this.shardRecordProcessorFactory = shardRecordProcessorFactory;
      }
  ```

  Ou você pode inicializar ConfigsBuilder com `MultiStreamTracker` se quiser implementar um aplicativo consumidor KCL que processe vários fluxos ao mesmo tempo.

  ```
  * Constructor to initialize ConfigsBuilder with MultiStreamTracker
       * @param multiStreamTracker
       * @param applicationName
       * @param kinesisClient
       * @param dynamoDBClient
       * @param cloudWatchClient
       * @param workerIdentifier
       * @param shardRecordProcessorFactory
       */
      public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName,
              @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
              @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
              @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
          this.appStreamTracker = Either.left(multiStreamTracker);
          this.applicationName = applicationName;
          this.kinesisClient = kinesisClient;
          this.dynamoDBClient = dynamoDBClient;
          this.cloudWatchClient = cloudWatchClient;
          this.workerIdentifier = workerIdentifier;
          this.shardRecordProcessorFactory = shardRecordProcessorFactory;
      }
  ```
+ Com o suporte multifluxo implementado na aplicação de consumo da KCL, cada linha da tabela de concessões da aplicação contém o ID do fragmento e o nome do fluxo que a aplicação processa. 
+ Quando o suporte multifluxo para sua aplicação de consumo da KCL é implementado, leaseKey assume a estrutura `account-id:StreamName:streamCreationTimestamp:ShardId`. Por exemplo, .`111111111:multiStreamTest-1:12345:shardId-000000000336`
**Importante**  
Quando sua aplicação de consumo da KCL está configurada para processar somente um fluxo de dados, leaseKey (a chave de hash da tabela de concessões) é o ID do fragmento. Ao reconfigurar a aplicação de consumo da KCL existente para processar vários fluxos de dados, a tabela de concessões será quebrada, pois no suporte multifluxo, a estrutura leaseKey deve ser a `account-id:StreamName:StreamCreationTimestamp:ShardId`.

## Use o KCL com o Registro do AWS Glue Esquema
<a name="shared-throughput-kcl-consumers-glue-schema-registry"></a>

Você pode integrar seus streams de dados do Kinesis com o AWS Glue Schema Registry. O registro de esquemas do AWS Glue permite detectar, controlar e evoluir esquemas centralmente, ao mesmo tempo que garante que os dados produzidos sejam validados continuamente por um esquema registrado. O esquema define a estrutura e o formato de um registro de dados. Um esquema é uma especificação versionada para publicação, consumo ou datastore confiáveis. O AWS Glue Schema Registry permite que você melhore a qualidade end-to-end dos dados e a governança de dados em seus aplicativos de streaming. Para obter mais informações, consulte [Registro de esquemas do AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). Uma das formas de configurar essa integração é usando a KCL em Java. 

**Importante**  
Atualmente, a integração do Kinesis Data AWS Glue Streams e do Schema Registry só é compatível com os streams de dados do Kinesis que usam consumidores do KCL 2.3 implementados em Java. Não há suporte para múltiplas linguagens. Os clientes da KCL 1.0 não são compatíveis. Os clientes da KCL 2.x anteriores à KCL 2.3 não são compatíveis.

Para obter instruções detalhadas sobre como configurar a integração do Kinesis Data Streams com o Schema Registry usando o KCL, consulte a [seção “Interagindo com dados KPL/KCL usando as bibliotecas” em Caso de uso: integração do Amazon Kinesis Data Streams](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds) com o Glue Schema Registry. AWS 

# Desenvolver consumidores personalizados com throughput compartilhada
<a name="shared-throughput-consumers"></a>

**Importante**  
As versões 1.x e 2.x da Amazon Kinesis Client Library (KCL) estão desatualizadas. O KCL 1.x chegará end-of-support em 30 de janeiro de 2026. É **altamente recomendável** que você migre suas aplicações da KCL que usam a versão 1.x para a versão mais recente da KCL antes de 30 de janeiro de 2026. Para encontrar a versão mais recente da KCL, consulte a página da [biblioteca de cliente do Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) em. GitHub Para obter informações sobre as versões mais recentes da KCL, consulte [Usar a Kinesis Client Library](kcl.md). Para ter mais informações sobre como migrar da KCL 1.x para a KCL 3.x, consulte [Migrar da KCL 1.x para a KCL 3.x](kcl-migration-1-3.md).

Caso não seja necessário um throughput específico ao receber dados do Kinesis Data Streams, nem atrasos de propagação de leitura de até 200 ms, pode-se criar aplicações de consumo seguindo as etapas descritas nos tópicos a seguir. É possível usar a Kinesis Client Library (KCL) ou o AWS SDK para Java.

**Topics**
+ [Desenvolver aplicações de consumo personalizadas com throughput compartilhada usando a KCL](custom-kcl-consumers.md)

Para obter informações sobre a criação de consumidores que podem receber registros de fluxos de dados do Kinesis com throughput dedicada, consulte [Desenvolver consumidores de distribuição avançada com throughput dedicado](enhanced-consumers.md).

# Desenvolver aplicações de consumo personalizadas com throughput compartilhada usando a KCL
<a name="custom-kcl-consumers"></a>

**Importante**  
As versões 1.x e 2.x da Amazon Kinesis Client Library (KCL) estão desatualizadas. O KCL 1.x chegará end-of-support em 30 de janeiro de 2026. É **altamente recomendável** que você migre suas aplicações da KCL que usam a versão 1.x para a versão mais recente da KCL antes de 30 de janeiro de 2026. Para encontrar a versão mais recente da KCL, consulte a página da [biblioteca de cliente do Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) em. GitHub Para obter informações sobre as versões mais recentes da KCL, consulte [Usar a Kinesis Client Library](kcl.md). Para ter mais informações sobre como migrar da KCL 1.x para a KCL 3.x, consulte [Migrar da KCL 1.x para a KCL 3.x](kcl-migration-1-3.md).

Um dos métodos de desenvolvimento de aplicações de consumo personalizadas com throughput compartilhada envolve o uso da Kinesis Client Library (KCL). 

Escolha um dos tópicos a seguir para a versão KCL que esteja sendo usada.

**Topics**
+ [Desenvolver aplicações de consumo da KCL 1.x](developing-consumers-with-kcl.md)
+ [Desenvolver aplicações de consumo da KCL 2.x](developing-consumers-with-kcl-v2.md)

# Desenvolver aplicações de consumo da KCL 1.x
<a name="developing-consumers-with-kcl"></a>

**Importante**  
As versões 1.x e 2.x da Amazon Kinesis Client Library (KCL) estão desatualizadas. O KCL 1.x chegará end-of-support em 30 de janeiro de 2026. É **altamente recomendável** que você migre suas aplicações da KCL que usam a versão 1.x para a versão mais recente da KCL antes de 30 de janeiro de 2026. Para encontrar a versão mais recente da KCL, consulte a página da [biblioteca de cliente do Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) em. GitHub Para obter informações sobre as versões mais recentes da KCL, consulte [Usar a Kinesis Client Library](kcl.md). Para ter mais informações sobre como migrar da KCL 1.x para a KCL 3.x, consulte [Migrar da KCL 1.x para a KCL 3.x](kcl-migration-1-3.md).

É possível criar uma aplicação de consumo para o Amazon Kinesis Data Streams usando a Kinesis Client Library (KCL). 

Para obter mais informações sobre o KCL, consulte [Sobre a KCL (versões anteriores)](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-overview).

Escolha um dos seguintes tópicos, dependendo do que deseja usar.

**Topics**
+ [Desenvolver uma aplicação de consumo da Kinesis Client Library em Java](kinesis-record-processor-implementation-app-java.md)
+ [Desenvolver uma aplicação de consumo da Kinesis Client Library em Node.js](kinesis-record-processor-implementation-app-nodejs.md)
+ [Desenvolver uma aplicação de consumo da Kinesis Client Library em .NET](kinesis-record-processor-implementation-app-dotnet.md)
+ [Desenvolver uma aplicação de consumo da Kinesis Client Library em Python](kinesis-record-processor-implementation-app-py.md)
+ [Desenvolver uma aplicação de consumo da Kinesis Client Library em Ruby](kinesis-record-processor-implementation-app-ruby.md)

# Desenvolver uma aplicação de consumo da Kinesis Client Library em Java
<a name="kinesis-record-processor-implementation-app-java"></a>

**Importante**  
As versões 1.x e 2.x da Amazon Kinesis Client Library (KCL) estão desatualizadas. O KCL 1.x chegará end-of-support em 30 de janeiro de 2026. É **altamente recomendável** que você migre suas aplicações da KCL que usam a versão 1.x para a versão mais recente da KCL antes de 30 de janeiro de 2026. Para encontrar a versão mais recente da KCL, consulte a página da [biblioteca de cliente do Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) em. GitHub Para obter informações sobre as versões mais recentes da KCL, consulte [Usar a Kinesis Client Library](kcl.md). Para ter mais informações sobre como migrar da KCL 1.x para a KCL 3.x, consulte [Migrar da KCL 1.x para a KCL 3.x](kcl-migration-1-3.md).

É possível usar a Kinesis Client Library (KCL) para criar aplicações que processam dados dos fluxos de dados do Kinesis. A Kinesis Client Library está disponível em várias linguagens. Este tópico discute Java. Para ver a referência de Javadoc, consulte o tópico [AWS Javadoc](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html) para Classe. AmazonKinesisClient

Para baixar o Java KCL de GitHub, acesse a [Kinesis Client Library (](https://github.com/awslabs/amazon-kinesis-client)Java). Para localizar a KCL Java no Apache Maven, acesse a página de [resultados da pesquisa de KCL](https://search.maven.org/#search|ga|1|amazon-kinesis-client). Para baixar o código de amostra para um aplicativo consumidor Java KCL em GitHub, acesse a página do [projeto de amostra KCL for Java](https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis) em. GitHub 

O aplicativo de exemplo usa [Apache Commons Logging](http://commons.apache.org/proper/commons-logging/guide.html). É possível alterar a configuração do registro em log no método `configure` estático definido no arquivo `AmazonKinesisApplicationSample.java`. *Para obter mais informações sobre como usar o Apache Commons Logging com aplicativos Log4j e AWS Java, consulte [Logging with Log4j](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/java-dg-logging.html) no Guia do desenvolvedor.AWS SDK para Java *

É necessário concluir as seguintes tarefas ao implementar uma aplicação de consumo da KCL em Java:

**Topics**
+ [Implemente os métodos IRecord do processador](#kinesis-record-processor-implementation-interface-java)
+ [Implemente uma fábrica de classes para a interface IRecord do processador](#kinesis-record-processor-implementation-factory-java)
+ [Criar um operador](#kcl-java-worker)
+ [Modificar as propriedades de configuração](#kinesis-record-processor-initialization-java)
+ [Migrar para a versão 2 da interface do processador de registros](#kcl-java-v2-migration)

## Implemente os métodos IRecord do processador
<a name="kinesis-record-processor-implementation-interface-java"></a>

Atualmente, a KCL oferece suporte a duas versões da interface do `IRecordProcessor`: a interface original está disponível com a primeira versão da KCL e a versão 2 está disponível desde a versão 1.5.0. As duas interfaces são totalmente compatíveis. A escolha depende dos requisitos de cenário específicos. Consulte os Javadocs criados localmente ou o código-fonte para ver todas as diferenças. As seções a seguir descrevem a implementação mínima para os conceitos básicos.

**Topics**
+ [Interface original (versão 1)](#kcl-java-interface-original)
+ [Interface atualizada (versão 2)](#kcl-java-interface-v2)

### Interface original (versão 1)
<a name="kcl-java-interface-original"></a>

A interface `IRecordProcessor` original (`package com.amazonaws.services.kinesis.clientlibrary.interfaces`) expõe os seguintes métodos de processador de registros que o consumidor precisa implementar. O exemplo fornece implementações que podem ser usadas como ponto de partida (consulte `AmazonKinesisApplicationSampleRecordProcessor.java`).

```
public void initialize(String shardId)
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

**inicializar**  
A KCL chama o método `initialize` quando o processador de registros é instanciado, passando um ID de fragmento específico como um parâmetro. Esse processador de registros processa apenas esse fragmento e, normalmente, o inverso também é verdadeiro (esse fragmento é processado somente por esse processador de registro). No entanto, a aplicação de consumo deve considerar a possibilidade de que um registro de dados pode ser processado mais de uma vez. A semântica do Kinesis Data Streams é do tipo *pelo menos uma vez*, o que significa que cada registro de dados de um fragmento é processado pelo menos uma vez por um operador na aplicação de consumo. Para obter mais informações sobre casos em que um fragmento específico pode ser processado por mais de um operador, consulte [Use refragmentação, escalonamento e processamento paralelo para alterar o número de fragmentos](kinesis-record-processor-scaling.md).

```
public void initialize(String shardId)
```

**processRecords**  
A KCL chama o método `processRecords` passando uma lista de registros de dados do fragmento especificado pelo método `initialize(shardId)`. O processador de registros processa os dados nesses registros de acordo com a semântica da aplicação de consumo. Por exemplo, o operador pode executar uma transformação nos dados e, em seguida, armazenar o resultado em um bucket do Amazon Simple Storage Service (Amazon S3).

```
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) 
```

Além dos dados em si, o registro também contém um número de sequência e uma chave de partição. O operador pode usar esses valores ao processar os dados. Por exemplo, o operador pode escolher o bucket do S3 no qual armazenar os dados com base no valor da chave de partição. A classe `Record` expõe os seguintes métodos que oferecem acesso aos dados do registro, número de sequência e chave de partição. 

```
record.getData()  
record.getSequenceNumber() 
record.getPartitionKey()
```

No exemplo, o método privado `processRecordsWithRetries` tem código que mostra como um operador pode acessar os dados do registro, o número de sequência e a chave de partição.

O Kinesis Data Streams requer que o processador de registros rastreie os registros que já foram processados em um fragmento. A KCL faz esse rastreamento passando um checkpointer (`IRecordProcessorCheckpointer`) para o `processRecords`. O processador de registros chama o método `checkpoint` nesta interface para informar a KCL sobre o progresso do processamento dos registros no fragmento. Se o operador falhar, a KCL usará essas informações para reiniciar o processamento do fragmento no último registro processado conhecido.

Em uma operação de divisão ou mesclagem, a KCL só começará a processar os novos fragmentos quando os processadores dos fragmentos originais chamarem `checkpoint` para indicar que o processamento dos fragmentos originais foi concluído.

Se nenhum parâmetro for fornecido, a KCL presumirá que a chamada para `checkpoint` significa que todos os registros foram processados até o último registro passado para o processador de registros. Portanto, o processador de registros deve chamar `checkpoint` somente após ter processado todos os registros na lista que foi passada a ele. Os processadores de registros não precisam chamar `checkpoint` em cada chamada para `processRecords`. Um processador pode, por exemplo, chamar `checkpoint` a cada terceira chamada para `processRecords`. É possível, opcionalmente, especificar o número de sequência exato de um registro como um parâmetro para `checkpoint`. Nesse caso, a KCL presume que todos os registros foram processados somente até o registro especificado.

No exemplo, o método privado `checkpoint` mostra como chamar `IRecordProcessorCheckpointer.checkpoint` usando a lógica de novas tentativas e o tratamento de exceções apropriados.

A KCL depende do `processRecords` para lidar com qualquer exceção ocorrida no processamento dos registros de dados. Se ocorrer uma exceção em `processRecords`, a KCL ignorará os registros de dados passados antes da exceção. Ou seja, esses registros não serão reenviados para o processador de registros que lançou a exceção ou para qualquer outro processador de registros na aplicação de consumo.

**shutdown**  
A KCL chama o método `shutdown` quando o processamento termina (o motivo do desligamento é `TERMINATE`) ou quando o operador não está mais respondendo (o motivo do desligamento é `ZOMBIE`).

```
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
```

O processamento termina quando o processador de registros não recebe mais registros do fragmento porque ele foi dividido ou intercalado, ou o fluxo foi excluído.

A KCL também passa uma interface do `IRecordProcessorCheckpointer` para `shutdown`. Se o motivo do desligamento é `TERMINATE`, o processador de registros deve terminar o processamento de todos os registros de dados e, em seguida, chamar o método `checkpoint` nesta interface.

### Interface atualizada (versão 2)
<a name="kcl-java-interface-v2"></a>

A interface `IRecordProcessor` atualizada (`package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`) expõe os seguintes métodos de processador de registros que o consumidor precisa implementar: 

```
void initialize(InitializationInput initializationInput)
void processRecords(ProcessRecordsInput processRecordsInput)
void shutdown(ShutdownInput shutdownInput)
```

Todos os argumentos da versão original da interface podem ser acessados por meio de métodos get nos objetos de contêiner. Por exemplo, para recuperar a lista de registros em `processRecords()`, pode-se usar `processRecordsInput.getRecords()`.

Além das entradas fornecidas pela interface original, estas novas entradas estão disponíveis a partir da versão 2 da interface (KCL 1.5.0 e posterior):

número de sequência inicial  
No objeto `InitializationInput` passado para a operação `initialize()`, o número de sequência inicial a partir do qual os registros seriam fornecidos à instância do processador de registros. Esse é o número de sequência que foi verificado pela última vez pela instância do processador de registros que processou anteriormente o mesmo fragmento. Isso será fornecido no caso de o aplicativo precisar de informações. 

número de sequência do ponto de verificação pendente  
No objeto `InitializationInput` passado para a operação `initialize()`, o número de sequência de verificação pendente (se houver) que não pôde ser confirmado antes que a instância do processador de registros anterior parasse.

## Implemente uma fábrica de classes para a interface IRecord do processador
<a name="kinesis-record-processor-implementation-factory-java"></a>

Também será necessário implementar uma fábrica para a classe que implementa os métodos do processador de registros. Quando a aplicação de consumo instancia o operador, ela passa uma referência a essa fábrica.

O exemplo implementa a classe de fábrica no arquivo `AmazonKinesisApplicationSampleRecordProcessorFactory.java` usando a interface de processador de registros original. Para que a fábrica da classe crie a versão 2 dos processadores de registros, deve-se usar o nome do pacote `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2`.

```
  public class SampleRecordProcessorFactory implements IRecordProcessorFactory { 
      /**
      * Constructor.
      */
      public SampleRecordProcessorFactory() {
          super();
      }
      /**
      * {@inheritDoc}
      */
      @Override
      public IRecordProcessor createProcessor() {
          return new SampleRecordProcessor();
      }
  }
```

## Criar um operador
<a name="kcl-java-worker"></a>

Como discutido em [Implemente os métodos IRecord do processador](#kinesis-record-processor-implementation-interface-java), há duas versões da interface do processador de registros da KCL para escolha, o que afeta a forma de criar um operador. A interface do processador de registros original usa a seguinte estrutura de código para criar um operador:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker(recordProcessorFactory, config);
```

Com a versão 2 da interface do processador de registros, é possível usar `Worker.Builder` para criar um operador sem a necessidade de se preocupar com qual construtor usar e a ordem dos argumentos. A interface do processador de registros atualizada usa a seguinte estrutura de código para criar um operador:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

## Modificar as propriedades de configuração
<a name="kinesis-record-processor-initialization-java"></a>

O exemplo fornece valores padrão para propriedades de configuração. Esses dados de configuração para o operador são então consolidados em um objeto `KinesisClientLibConfiguration`. Esse objeto e uma referência à fábrica de classe para `IRecordProcessor` são passados na chamada que instancia o operador. É possível substituir qualquer uma dessas propriedades por seus próprios valores usando um arquivo de propriedades do Java (consulte `AmazonKinesisApplicationSample.java`).

### Nome da aplicação
<a name="configuration-property-application-name"></a>

A KCL exige um nome de aplicação exclusivo entre as aplicações e as tabelas do Amazon DynamoDB na mesma região. Ela usa o valor de configuração de nome de aplicativo das seguintes formas:
+ Presume-se que todos os operadores associados com esse nome de aplicativo estejam trabalhando juntos no mesmo fluxo. Esses operadores podem ser distribuídos em várias instâncias. Ao executar uma instância adicional do mesmo código da aplicação, mas com um nome diferente, a KCL tratará a segunda instância como uma aplicação totalmente independente operando no mesmo fluxo.
+ A KCL cria uma tabela do DynamoDB com o nome da aplicação e usa essa tabela para manter informações de estado (como pontos de verificação e mapeamento de operador-fragmento) da aplicação. Cada aplicação tem sua própria tabela do DynamoDB. Para obter mais informações, consulte [Usar uma tabela de concessões para monitorar os fragmentos processados pela aplicação de consumo da KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Configurar credenciais
<a name="kinesis-record-processor-cred-java"></a>

Você deve disponibilizar suas AWS credenciais para um dos provedores de credenciais na cadeia de provedores de credenciais padrão. Por exemplo, ao executar a aplicação de consumo em uma instância do EC2, recomenda-se que a instância seja executada com um perfil do IAM. As credenciais da AWS que refletem as permissões associadas a esse perfil do IAM são disponibilizadas às aplicações na instância por meio dos metadados da instância. Essa é a maneira mais segura de gerenciar credenciais para uma aplicação de consumo em execução em uma instância do EC2.

A aplicação de exemplo primeiro tenta recuperar as credenciais do IAM nos metadados da instância: 

```
credentialsProvider = new InstanceProfileCredentialsProvider(); 
```

Se a aplicação de exemplo não consegue obter credenciais dos metadados da instância, ele tenta recuperar as credenciais de um arquivo de propriedades:

```
credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
```

Para obter mais informações sobre os metadados da instância, consulte [Metadados da instância](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html) no *Guia do usuário do Amazon EC2*.

### Usar o ID do operador para várias instâncias
<a name="kinesis-record-processor-workerid-java"></a>

O código de inicialização de exemplo cria um ID para o operador, `workerId`, usando o nome do computador local e anexando um identificador exclusivo globalmente, conforme mostrado no seguinte trecho de código. Essa abordagem é compatível com o cenário de várias instâncias da aplicação de consumo em execução em um único computador.

```
String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
```

## Migrar para a versão 2 da interface do processador de registros
<a name="kcl-java-v2-migration"></a>

Para migrar o código que usa a interface original, além das etapas descritas anteriormente, as seguintes etapas serão necessárias:

1. Altere a classe do processador de registros para importar a versão 2 da interface do processador de registros:

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   ```

1. Altere as referências para as entradas para usar métodos `get` nos objetos de contêiner. Por exemplo, na operação `shutdown()`, altere "`checkpointer`" para "`shutdownInput.getCheckpointer()`".

1. Altere a classe da fábrica do processador de registros para importar a versão 2 da interface da fábrica do processador de registros:

   ```
   import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   ```

1. Altere a construção do operador para usar `Worker.Builder`. Por exemplo:

   ```
   final Worker worker = new Worker.Builder()
       .recordProcessorFactory(recordProcessorFactory)
       .config(config)
       .build();
   ```

# Desenvolver uma aplicação de consumo da Kinesis Client Library em Node.js
<a name="kinesis-record-processor-implementation-app-nodejs"></a>

**Importante**  
As versões 1.x e 2.x da Amazon Kinesis Client Library (KCL) estão desatualizadas. O KCL 1.x chegará end-of-support em 30 de janeiro de 2026. É **altamente recomendável** que você migre suas aplicações da KCL que usam a versão 1.x para a versão mais recente da KCL antes de 30 de janeiro de 2026. Para encontrar a versão mais recente da KCL, consulte a página da [biblioteca de cliente do Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) em. GitHub Para obter informações sobre as versões mais recentes da KCL, consulte [Usar a Kinesis Client Library](kcl.md). Para ter mais informações sobre como migrar da KCL 1.x para a KCL 3.x, consulte [Migrar da KCL 1.x para a KCL 3.x](kcl-migration-1-3.md).

É possível usar a Kinesis Client Library (KCL) para criar aplicações que processam dados dos fluxos de dados do Kinesis. A Kinesis Client Library está disponível em várias linguagens. Este tópico discute Node.js.

A KCL é uma biblioteca Java; o suporte para linguagens diferentes de Java é fornecido usando uma interface multilíngue chamada de. *MultiLangDaemon* Esse daemon baseado em Java é executado em segundo plano quando uma linguagem de KCL diferente de Java é utilizada. Portanto, se você instalar o KCL para Node.js e escrever seu aplicativo de consumidor inteiramente em Node.js, ainda precisará do Java instalado em seu sistema por causa do MultiLangDaemon. Além disso, MultiLangDaemon tem algumas configurações padrão que você pode precisar personalizar para seu caso de uso, por exemplo, a AWS região à qual ele se conecta. Para obter mais informações sobre o MultiLangDaemon on GitHub, acesse a página do [ MultiLangDaemon projeto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Para baixar a KCL do Node.js GitHub, acesse a [Biblioteca de Cliente Kinesis (](https://github.com/awslabs/amazon-kinesis-client-nodejs)Node.js).

**Downloads de códigos de exemplo**

Há dois exemplos de código disponíveis para KCL em Node.js:
+ [basic-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/basic_sample)

  Usado nas seções a seguir para ilustrar os conceitos básicos de criação de uma aplicação de consumo da KCL em Node.js.
+ [click-stream-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/click_stream_sample)

   Levemente mais avançado e usa um cenário real, para depois que houver familiaridade com o código de exemplo básico. Esse exemplo não é discutido aqui, mas há um arquivo README com mais informações.

É necessário concluir as seguintes tarefas ao implementar uma aplicação de consumo da KCL em Node.js:

**Topics**
+ [Implementar o processador de registros](#kinesis-record-processor-implementation-interface-nodejs)
+ [Modificar as propriedades de configuração](#kinesis-record-processor-initialization-nodejs)

## Implementar o processador de registros
<a name="kinesis-record-processor-implementation-interface-nodejs"></a>

A aplicação de consumo mais simples possível usando a KCL para Node.js deve implementar uma função `recordProcessor`, que, por sua vez, contém as funções `initialize`, `processRecords` e `shutdown`. O exemplo fornece uma implementação que pode ser usada como ponto de partida (consulte `sample_kcl_app.js`).

```
function recordProcessor() {
  // return an object that implements initialize, processRecords and shutdown functions.}
```

**inicializar**  
A KCL chama a função `initialize` quando o processador de registros é iniciado. Esse processador de registros processa apenas o ID do fragmento passado como `initializeInput.shardId` e, normalmente, o inverso também é verdadeiro (esse fragmento é processado somente por esse processador de registro). No entanto, a aplicação de consumo deve considerar a possibilidade de que um registro de dados pode ser processado mais de uma vez. Isso acontece porque a semântica do Kinesis Data Streams é do tipo *pelo menos uma vez*, o que significa que cada registro de dados de um fragmento é processado pelo menos uma vez por um operador na aplicação de consumo. Para obter mais informações sobre casos em que um fragmento específico pode ser processado por mais de um operador, consulte [Use refragmentação, escalonamento e processamento paralelo para alterar o número de fragmentos](kinesis-record-processor-scaling.md).

```
initialize: function(initializeInput, completeCallback)
```

**processRecords**  
 A KCL chama essa função com uma entrada contendo uma lista de registros de dados do fragmento especificado para a função `initialize`. O processador de registros implementado processará os dados nesses registros de acordo com a semântica da aplicação de consumo. Por exemplo, o operador pode executar uma transformação nos dados e, em seguida, armazenar o resultado em um bucket do Amazon Simple Storage Service (Amazon S3). 

```
processRecords: function(processRecordsInput, completeCallback)
```

Além dos dados em si, o registro também contém um número de sequência e uma chave de partição, que o operador pode usar ao processar os dados. Por exemplo, o operador pode escolher o bucket do S3 no qual armazenar os dados com base no valor da chave de partição. O dicionário de `record` expõe os seguintes pares de chave/valor para acessar os dados do registro, o número de sequência e a chave de partição:

```
record.data
record.sequenceNumber
record.partitionKey
```

Observe que os dados são codificados em Base64.

No exemplo básico, a função `processRecords` tem código que mostra como um operador pode acessar os dados do registro, o número de sequência e a chave de partição.

O Kinesis Data Streams requer que o processador de registros rastreie os registros que já foram processados em um fragmento. A KCL faz esse rastreamento com um objeto `checkpointer` passado como `processRecordsInput.checkpointer`. O processador de registros chama a função `checkpointer.checkpoint` para informar a KCL sobre o progresso do processamento dos registros no fragmento. Se o operador falhar, a KCL usará essas informações ao reiniciar o processamento do fragmento para continuar a partir do último registro processado conhecido.

Em uma operação de divisão ou mesclagem, a KCL só começará a processar os novos fragmentos quando os processadores dos fragmentos originais chamarem `checkpoint` para indicar que o processamento dos fragmentos originais foi concluído.

Se u mnúmero de sequência não for passado para a função `checkpoint`, a KCL presumirá que a chamada para `checkpoint` significa que todos os registros foram processados até o último registro passado para o processador de registros. Portanto, o processador de registros deve chamar `checkpoint` **somente** após ter processado todos os registros na lista que foi passada para ele. Os processadores de registros não precisam chamar `checkpoint` em cada chamada para `processRecords`. Um processador pode, por exemplo, chamar `checkpoint` a cada terceira chamada ou algum evento externo ao seu processador de gravação, como um verification/validation serviço personalizado que você implementou. 

É possível, opcionalmente, especificar o número de sequência exato de um registro como um parâmetro para `checkpoint`. Nesse caso, a KCL presume que todos os registros foram processados somente até o registro especificado.

O aplicativo de exemplo básico mostra a chamada mais simples possível para a função `checkpointer.checkpoint`. É possível adicionar outra lógica de verificação que precisar para o consumidor neste ponto da função.

**shutdown**  
A KCL chama a função `shutdown` quando o processamento termina (`shutdownInput.reason` é `TERMINATE`) ou quando o operador não está mais respondendo (`shutdownInput.reason` é `ZOMBIE`).

```
shutdown: function(shutdownInput, completeCallback)
```

O processamento termina quando o processador de registros não recebe mais registros do fragmento porque ele foi dividido ou intercalado, ou o fluxo foi excluído.

A KCL também passa um objeto `shutdownInput.checkpointer` para `shutdown`. Se o motivo do desligamento for `TERMINATE`, é necessário verificar se o processador de registros terminou o processamento de todos os registros de dados e, em seguida, chamar a função `checkpoint` nessa interface.

## Modificar as propriedades de configuração
<a name="kinesis-record-processor-initialization-nodejs"></a>

O exemplo fornece valores padrão para as propriedades de configuração. É possível substituir qualquer uma dessas propriedades por seus próprios valores (consulte `sample.properties` no exemplo básico).

### Nome da aplicação
<a name="kinesis-record-processor-application-name-nodejs"></a>

A KCL exige uma aplicação exclusiva entre as aplicações e as tabelas do Amazon DynamoDB na mesma região. Ela usa o valor de configuração de nome de aplicativo das seguintes formas:
+ Presume-se que todos os operadores associados com esse nome de aplicativo estejam trabalhando juntos no mesmo fluxo. Esses operadores podem ser distribuídos em várias instâncias. Ao executar uma instância adicional do mesmo código da aplicação, mas com um nome diferente, a KCL tratará a segunda instância como uma aplicação totalmente independente operando no mesmo fluxo.
+ A KCL cria uma tabela do DynamoDB com o nome da aplicação e usa essa tabela para manter informações de estado (como pontos de verificação e mapeamento de operador-fragmento) da aplicação. Cada aplicação tem sua própria tabela do DynamoDB. Para obter mais informações, consulte [Usar uma tabela de concessões para monitorar os fragmentos processados pela aplicação de consumo da KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Configurar credenciais
<a name="kinesis-record-processor-credentials-nodejs"></a>

Você deve disponibilizar suas AWS credenciais para um dos provedores de credenciais na cadeia de provedores de credenciais padrão. A propriedade `AWSCredentialsProvider` pode ser usada para definir um provedor de credenciais. O arquivo `sample.properties` precisa disponibilizar as credenciais para um dos provedores de credenciais na [cadeia de provedores de credenciais padrão](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Se você estiver executando seu consumidor em uma instância do Amazon EC2, recomendamos que você configure a instância com uma função do IAM. AWS as credenciais que refletem as permissões associadas a essa função do IAM são disponibilizadas aos aplicativos na instância por meio dos metadados da instância. Essa é a maneira mais segura de gerenciar credenciais para uma aplicação de consumo em execução em uma instância do EC2.

O exemplo a seguir configura a KCL para processar um fluxo de dados do Kinesis chamado `kclnodejssample` usando o processador de registros fornecido em `sample_kcl_app.js`:

```
# The Node.js executable script
executableName = node sample_kcl_app.js
# The name of an Amazon Kinesis stream to process
streamName = kclnodejssample
# Unique KCL application name
applicationName = kclnodejssample
# Use default AWS credentials provider chain
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
# Read from the beginning of the stream
initialPositionInStream = TRIM_HORIZON
```

# Desenvolver uma aplicação de consumo da Kinesis Client Library em .NET
<a name="kinesis-record-processor-implementation-app-dotnet"></a>

**Importante**  
As versões 1.x e 2.x da Amazon Kinesis Client Library (KCL) estão desatualizadas. O KCL 1.x chegará end-of-support em 30 de janeiro de 2026. É **altamente recomendável** que você migre suas aplicações da KCL que usam a versão 1.x para a versão mais recente da KCL antes de 30 de janeiro de 2026. Para encontrar a versão mais recente da KCL, consulte a página da [biblioteca de cliente do Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) em. GitHub Para obter informações sobre as versões mais recentes da KCL, consulte [Usar a Kinesis Client Library](kcl.md). Para ter mais informações sobre como migrar da KCL 1.x para a KCL 3.x, consulte [Migrar da KCL 1.x para a KCL 3.x](kcl-migration-1-3.md).

É possível usar a Kinesis Client Library (KCL) para criar aplicações que processam dados dos fluxos de dados do Kinesis. A Kinesis Client Library está disponível em várias linguagens. Este tópico discute .NET.

A KCL é uma biblioteca Java; o suporte para linguagens diferentes de Java é fornecido usando uma interface multilíngue chamada de. *MultiLangDaemon* Esse daemon baseado em Java é executado em segundo plano quando uma linguagem de KCL diferente de Java é utilizada. Portanto, se você instalar o KCL para.NET e escrever seu aplicativo de consumidor inteiramente no.NET, ainda precisará do Java instalado em seu sistema por causa do MultiLangDaemon. Além disso, MultiLangDaemon tem algumas configurações padrão que você pode precisar personalizar para seu caso de uso, por exemplo, a AWS região à qual ele se conecta. Para obter mais informações sobre o MultiLangDaemon on GitHub, acesse a página do [ MultiLangDaemon projeto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Para baixar o.NET KCL de GitHub, acesse [Kinesis Client Library (](https://github.com/awslabs/amazon-kinesis-client-net).NET). Para baixar o código de amostra para um aplicativo de consumidor do.NET KCL, acesse a página do [projeto de amostra de consumidor KCL para.NET](https://github.com/awslabs/amazon-kinesis-client-net/tree/master/SampleConsumer) em. GitHub

É necessário concluir as seguintes tarefas ao implementar uma aplicação de consumo da KCL em .NET:

**Topics**
+ [Implemente os métodos da classe IRecord Processor](#kinesis-record-processor-implementation-interface-dotnet)
+ [Modificar as propriedades de configuração](#kinesis-record-processor-initialization-dotnet)

## Implemente os métodos da classe IRecord Processor
<a name="kinesis-record-processor-implementation-interface-dotnet"></a>

A aplicação de consumo precisa implementar os seguintes métodos para `IRecordProcessor`. A aplicação de consumo de exemplo fornece implementações que podem ser usadas como ponto de partida (consulte a classe `SampleRecordProcessor` em `SampleConsumer/AmazonKinesisSampleConsumer.cs`).

```
public void Initialize(InitializationInput input)
public void ProcessRecords(ProcessRecordsInput input)
public void Shutdown(ShutdownInput input)
```

**Inicializar**  
A KCL chama este método quando o processador de registros é instanciado, passando um ID de fragmento específico no parâmetro `input` (`input.ShardId`). Esse processador de registros processa apenas esse fragmento e, normalmente, o inverso também é verdadeiro (esse fragmento é processado somente por esse processador de registro). No entanto, a aplicação de consumo deve considerar a possibilidade de que um registro de dados pode ser processado mais de uma vez. Isso acontece porque a semântica do Kinesis Data Streams é do tipo *pelo menos uma vez*, o que significa que cada registro de dados de um fragmento é processado pelo menos uma vez por um operador na aplicação de consumo. Para obter mais informações sobre casos em que um fragmento específico pode ser processado por mais de um operador, consulte [Use refragmentação, escalonamento e processamento paralelo para alterar o número de fragmentos](kinesis-record-processor-scaling.md).

```
public void Initialize(InitializationInput input)
```

**ProcessRecords**  
A KCL chama este método passando uma lista de registros de dados no parâmetro `input` (`input.Records`) do fragmento especificado pelo método `Initialize`. O processador de registros implementado processará os dados nesses registros de acordo com a semântica da aplicação de consumo. Por exemplo, o operador pode executar uma transformação nos dados e, em seguida, armazenar o resultado em um bucket do Amazon Simple Storage Service (Amazon S3).

```
public void ProcessRecords(ProcessRecordsInput input)
```

Além dos dados em si, o registro também contém um número de sequência e uma chave de partição. O operador pode usar esses valores ao processar os dados. Por exemplo, o operador pode escolher o bucket do S3 no qual armazenar os dados com base no valor da chave de partição. A classe `Record` expõe os seguintes itens para acessar os dados do registro, o número de sequência e a chave de partição:

```
byte[] Record.Data 
string Record.SequenceNumber
string Record.PartitionKey
```

No exemplo, o método `ProcessRecordsWithRetries` tem código que mostra como um operador pode acessar os dados do registro, o número de sequência e a chave de partição.

O Kinesis Data Streams requer que o processador de registros rastreie os registros que já foram processados em um fragmento. A KCL faz esse rastreamento, passando um objeto `Checkpointer` para `ProcessRecords` (`input.Checkpointer`). O processador de registros chama o método `Checkpointer.Checkpoint` para informar a KCL sobre o progresso do processamento dos registros no fragmento. Se o operador falhar, a KCL usará essas informações para reiniciar o processamento do fragmento no último registro processado conhecido.

Em uma operação de divisão ou mesclagem, a KCL só começará a processar os novos fragmentos quando os processadores dos fragmentos originais chamarem `Checkpointer.Checkpoint` para indicar que o processamento dos fragmentos originais foi concluído.

Se nenhum parâmetro for fornecido, a KCL presumirá que a chamada para `Checkpointer.Checkpoint` significa que todos os registros foram processados até o último registro passado para o processador de registros. Portanto, o processador de registros deve chamar `Checkpointer.Checkpoint` somente após ter processado todos os registros na lista que foi passada a ele. Os processadores de registros não precisam chamar `Checkpointer.Checkpoint` em cada chamada para `ProcessRecords`. Um processador pode, por exemplo, chamar `Checkpointer.Checkpoint` a cada terceira ou quarta chamada. É possível, opcionalmente, especificar o número de sequência exato de um registro como um parâmetro para `Checkpointer.Checkpoint`. Nesse caso, a KCL presume que os registros foram processados somente até o registro especificado.

No exemplo, o método privado `Checkpoint(Checkpointer checkpointer)` mostra como chamar o método `Checkpointer.Checkpoint` usando a lógica de novas tentativas e o tratamento de exceções apropriados.

De maneira diferente das bibliotecas KCL em outras linguagens, a KCL para .NET não lida com nenhuma exceção ocorrida no processamento dos registros de dados. As exceções não detectadas do código do usuário causam uma falha no programa.

**Shutdown**  
A KCL chama o método `Shutdown` quando o processamento termina (o motivo do desligamento é `TERMINATE`) ou quando o operador não está mais respondendo (o valor `input.Reason` do desligamento é `ZOMBIE`).

```
public void Shutdown(ShutdownInput input)
```

O processamento termina quando o processador de registros não recebe mais registros do fragmento porque ele foi dividido ou intercalado, ou o fluxo foi excluído.

A KCL também passa um objeto `Checkpointer` para `shutdown`. Se o motivo do desligamento é `TERMINATE`, o processador de registros deve terminar o processamento de todos os registros de dados e, em seguida, chamar o método `checkpoint` nesta interface.

## Modificar as propriedades de configuração
<a name="kinesis-record-processor-initialization-dotnet"></a>

A aplicação de consumo de exemplo fornece valores padrão para as propriedades de configuração. É possível substituir qualquer uma dessas propriedades por seus próprios valores (consulte `SampleConsumer/kcl.properties`).

### Nome da aplicação
<a name="modify-kinesis-record-processor-application-name"></a>

A KCL exige uma aplicação exclusiva entre as aplicações e as tabelas do Amazon DynamoDB na mesma região. Ela usa o valor de configuração de nome de aplicativo das seguintes formas:
+ Presume-se que todos os operadores associados com esse nome de aplicativo estejam trabalhando juntos no mesmo fluxo. Esses operadores podem ser distribuídos em várias instâncias. Ao executar uma instância adicional do mesmo código da aplicação, mas com um nome diferente, a KCL tratará a segunda instância como uma aplicação totalmente independente operando no mesmo fluxo.
+ A KCL cria uma tabela do DynamoDB com o nome da aplicação e usa essa tabela para manter informações de estado (como pontos de verificação e mapeamento de operador-fragmento) da aplicação. Cada aplicação tem sua própria tabela do DynamoDB. Para obter mais informações, consulte [Usar uma tabela de concessões para monitorar os fragmentos processados pela aplicação de consumo da KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Configurar credenciais
<a name="kinesis-record-processor-creds-dotnet"></a>

Você deve disponibilizar suas AWS credenciais para um dos provedores de credenciais na cadeia de provedores de credenciais padrão. A propriedade `AWSCredentialsProvider` pode ser usada para definir um provedor de credenciais. As [sample.properties](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) precisam disponibilizar as credenciais para um dos provedores de credenciais na [cadeia de provedores de credenciais padrão](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Ao executar a aplicação de consumo em uma instância do EC2, recomenda-se configurar a instância com um perfil do IAM. As credenciais da AWS que refletem as permissões associadas a esse perfil do IAM são disponibilizadas às aplicações na instância por meio dos metadados da instância. Essa é a maneira mais segura de gerenciar credenciais para uma aplicação de consumo em execução em uma instância do EC2.

O arquivo de propriedades do exemplo configura a KCL para processar um fluxo de dados do Kinesis chamado “words” usando o processador de registros fornecido em `AmazonKinesisSampleConsumer.cs`. 

# Desenvolver uma aplicação de consumo da Kinesis Client Library em Python
<a name="kinesis-record-processor-implementation-app-py"></a>

**Importante**  
As versões 1.x e 2.x da Amazon Kinesis Client Library (KCL) estão desatualizadas. O KCL 1.x chegará end-of-support em 30 de janeiro de 2026. É **altamente recomendável** que você migre suas aplicações da KCL que usam a versão 1.x para a versão mais recente da KCL antes de 30 de janeiro de 2026. Para encontrar a versão mais recente da KCL, consulte a página da [biblioteca de cliente do Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) em. GitHub Para obter informações sobre as versões mais recentes da KCL, consulte [Usar a Kinesis Client Library](kcl.md). Para ter mais informações sobre como migrar da KCL 1.x para a KCL 3.x, consulte [Migrar da KCL 1.x para a KCL 3.x](kcl-migration-1-3.md).

É possível usar a Kinesis Client Library (KCL) para criar aplicações que processam dados dos fluxos de dados do Kinesis. A Kinesis Client Library está disponível em várias linguagens. Este tópico discute Python.

A KCL é uma biblioteca Java; o suporte para linguagens diferentes de Java é fornecido usando uma interface multilíngue chamada de. *MultiLangDaemon* Esse daemon baseado em Java é executado em segundo plano quando uma linguagem de KCL diferente de Java é utilizada. Portanto, se você instalar o KCL para Python e escrever seu aplicativo de consumidor inteiramente em Python, ainda precisará do Java instalado em seu sistema por causa do. MultiLangDaemon Além disso, MultiLangDaemon tem algumas configurações padrão que você pode precisar personalizar para seu caso de uso, por exemplo, a AWS região à qual ele se conecta. Para obter mais informações sobre o MultiLangDaemon on GitHub, acesse a página do [ MultiLangDaemon projeto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Para baixar o Python KCL em GitHub, acesse a [Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client-python) (Python). Para baixar o código de amostra para um aplicativo de consumidor do Python KCL, acesse a página do projeto de amostra do [KCL for Python](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples) em. GitHub

É necessário concluir as seguintes tarefas ao implementar uma aplicação de consumo da KCL em Python:

**Topics**
+ [Implemente os métodos RecordProcessor de classe](#kinesis-record-processor-implementation-interface-py)
+ [Modificar as propriedades de configuração](#kinesis-record-processor-initialization-py)

## Implemente os métodos RecordProcessor de classe
<a name="kinesis-record-processor-implementation-interface-py"></a>

A classe `RecordProcess` precisa estender o `RecordProcessorBase` para implementar os métodos a seguir. O exemplo fornece implementações que podem ser usadas como ponto de partida (consulte `sample_kclpy_app.py`).

```
def initialize(self, shard_id)
def process_records(self, records, checkpointer)
def shutdown(self, checkpointer, reason)
```

**inicializar**  
A KCL chama o método `initialize` quando o processador de registros é instanciado, passando um ID de fragmento específico como um parâmetro. Esse processador de registros processa apenas esse fragmento e, normalmente, o inverso também é verdadeiro (esse fragmento é processado somente por esse processador de registro). No entanto, a aplicação de consumo deve considerar a possibilidade de que um registro de dados pode ser processado mais de uma vez. Isso acontece porque a semântica do Kinesis Data Streams é do tipo *pelo menos uma vez*, o que significa que cada registro de dados de um fragmento é processado pelo menos uma vez por um operador no consumidor. Para obter mais informações sobre casos em que um fragmento específico pode ser processado por mais de um operador, consulte [Use refragmentação, escalonamento e processamento paralelo para alterar o número de fragmentos](kinesis-record-processor-scaling.md).

```
def initialize(self, shard_id)
```

**process\$1records**  
 A KCL chama este método passando uma lista de registros de dados do fragmento especificado pelo método `initialize`. O processador de registros implementado processará os dados nesses registros de acordo com a semântica da aplicação de consumo. Por exemplo, o operador pode executar uma transformação nos dados e, em seguida, armazenar o resultado em um bucket do Amazon Simple Storage Service (Amazon S3).

```
def process_records(self, records, checkpointer) 
```

Além dos dados em si, o registro também contém um número de sequência e uma chave de partição. O operador pode usar esses valores ao processar os dados. Por exemplo, o operador pode escolher o bucket do S3 no qual armazenar os dados com base no valor da chave de partição. O dicionário de `record` expõe os seguintes pares de chave/valor para acessar os dados do registro, o número de sequência e a chave de partição:

```
record.get('data')
record.get('sequenceNumber')
record.get('partitionKey')
```

Observe que os dados são codificados em Base64.

No exemplo, o método `process_records` tem código que mostra como um operador pode acessar os dados do registro, o número de sequência e a chave de partição.

O Kinesis Data Streams requer que o processador de registros rastreie os registros que já foram processados em um fragmento. A KCL faz esse rastreamento, passando um objeto `Checkpointer` para `process_records`. O processador de registros chama o método `checkpoint` neste objeto para informar a KCL sobre o progresso do processamento dos registros no fragmento. Se o operador falhar, a KCL usará essas informações para reiniciar o processamento do fragmento no último registro processado conhecido.

Em uma operação de divisão ou mesclagem, a KCL só começará a processar os novos fragmentos quando os processadores dos fragmentos originais chamarem `checkpoint` para indicar que o processamento dos fragmentos originais foi concluído.

Se você não passar um parâmetro, a KCL presumirá que a chamada para `checkpoint` significa que todos os registros foram processados até o último registro passado para o processador de registros. Portanto, o processador de registros deve chamar `checkpoint` somente após ter processado todos os registros na lista que foi passada a ele. Os processadores de registros não precisam chamar `checkpoint` em cada chamada para `process_records`. Um processador pode, por exemplo, chamar `checkpoint` a cada terceira chamada. É possível, opcionalmente, especificar o número de sequência exato de um registro como um parâmetro para `checkpoint`. Nesse caso, a KCL presume que todos os registros foram processados somente até o registro especificado.

No exemplo, o método privado `checkpoint` mostra como chamar o método `Checkpointer.checkpoint` usando a lógica de novas tentativas e o tratamento de exceções apropriados.

A KCL depende do `process_records` para lidar com qualquer exceção ocorrida no processamento dos registros de dados. Se ocorrer uma exceção em `process_records`, a KCL ignorará os registros de dados passados para `process_records` antes da exceção. Ou seja, esses registros não serão reenviados para o processador de registros que lançou a exceção ou para qualquer outro processador de registros na aplicação de consumo.

**shutdown**  
 A KCL chama o método `shutdown` quando o processamento termina (o motivo do desligamento é `TERMINATE`) ou quando o operador não está mais respondendo (o `reason` do desligamento é `ZOMBIE`).

```
def shutdown(self, checkpointer, reason)
```

O processamento termina quando o processador de registros não recebe mais registros do fragmento porque ele foi dividido ou intercalado, ou o fluxo foi excluído.

 A KCL também passa um objeto `Checkpointer` para `shutdown`. Se o `reason` do desligamento é `TERMINATE`, o processador de registros deve terminar o processamento de todos os registros de dados e, em seguida, chamar o método `checkpoint` nesta interface.

## Modificar as propriedades de configuração
<a name="kinesis-record-processor-initialization-py"></a>

O exemplo fornece valores padrão para as propriedades de configuração. É possível substituir qualquer uma dessas propriedades por seus próprios valores (consulte `sample.properties`).

### Nome da aplicação
<a name="kinesis-record-processor-application-name-py"></a>

A KCL exige um nome de aplicação exclusivo entre as aplicações e as tabelas do Amazon DynamoDB na mesma região. Ela usa o valor de configuração de nome de aplicativo das seguintes formas:
+ Presume-se que todos os operadores associados a esse nome de aplicativo estejam trabalhando juntos no mesmo fluxo. Esses operadores podem ser distribuídos em várias instâncias. Se você executar uma instância adicional do mesmo código da aplicação, mas com um nome diferente, a KCL tratará a segunda instância como uma aplicação totalmente independente operando no mesmo fluxo.
+ A KCL cria uma tabela do DynamoDB com o nome da aplicação e usa essa tabela para manter informações de estado (como pontos de verificação e mapeamento de operador-fragmento) da aplicação. Cada aplicação tem sua própria tabela do DynamoDB. Para obter mais informações, consulte [Usar uma tabela de concessões para monitorar os fragmentos processados pela aplicação de consumo da KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Configurar credenciais
<a name="kinesis-record-processor-creds-py"></a>

Você deve disponibilizar suas AWS credenciais para um dos provedores de credenciais na cadeia de provedores de credenciais padrão. A propriedade `AWSCredentialsProvider` pode ser usada para definir um provedor de credenciais. As [sample.properties](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) precisam disponibilizar as credenciais para um dos provedores de credenciais na [cadeia de provedores de credenciais padrão](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). Ao executar a aplicação de consumo em uma instância do Amazon EC2, recomenda-se que a instância seja configurada com um perfil do IAM. As credenciais da AWS que refletem as permissões associadas a esse perfil do IAM são disponibilizadas às aplicações na instância por meio dos metadados da instância. Essa é a maneira mais segura de gerenciar credenciais para uma aplicação de consumo em execução em uma instância do EC2.

O arquivo de propriedades do exemplo configura a KCL para processar um fluxo de dados do Kinesis chamado “words” usando o processador de registros fornecido em `sample_kclpy_app.py`. 

# Desenvolver uma aplicação de consumo da Kinesis Client Library em Ruby
<a name="kinesis-record-processor-implementation-app-ruby"></a>

**Importante**  
As versões 1.x e 2.x da Amazon Kinesis Client Library (KCL) estão desatualizadas. O KCL 1.x chegará end-of-support em 30 de janeiro de 2026. É **altamente recomendável** que você migre suas aplicações da KCL que usam a versão 1.x para a versão mais recente da KCL antes de 30 de janeiro de 2026. Para encontrar a versão mais recente da KCL, consulte a página da [biblioteca de cliente do Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) em. GitHub Para obter informações sobre as versões mais recentes da KCL, consulte [Usar a Kinesis Client Library](kcl.md). Para ter mais informações sobre como migrar da KCL 1.x para a KCL 3.x, consulte [Migrar da KCL 1.x para a KCL 3.x](kcl-migration-1-3.md).

É possível usar a Kinesis Client Library (KCL) para criar aplicações que processam dados dos fluxos de dados do Kinesis. A Kinesis Client Library está disponível em várias linguagens. Este tópico discute Ruby.

A KCL é uma biblioteca Java; o suporte para linguagens diferentes de Java é fornecido usando uma interface multilíngue chamada de. *MultiLangDaemon* Esse daemon baseado em Java é executado em segundo plano quando uma linguagem de KCL diferente de Java é utilizada. Portanto, se você instalar o KCL para Ruby e escrever seu aplicativo de consumo inteiramente em Ruby, ainda precisará do Java instalado em seu sistema por causa do. MultiLangDaemon Além disso, MultiLangDaemon tem algumas configurações padrão que você pode precisar personalizar para seu caso de uso, por exemplo, a AWS região à qual ele se conecta. Para obter mais informações sobre o MultiLangDaemon on GitHub, acesse a página do [ MultiLangDaemon projeto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Para baixar a KCL do Ruby GitHub, acesse a [Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client-ruby) (Ruby). Para baixar o código de amostra para um aplicativo de consumidor Ruby KCL, acesse a página do projeto de [amostra KCL for Ruby](https://github.com/awslabs/amazon-kinesis-client-ruby/tree/master/samples) em. GitHub

Para obter mais informações sobre a biblioteca de suporte da KCL Ruby, consulte a [documentação da KCL para gems da Ruby](http://www.rubydoc.info/gems/aws-kclrb).

# Desenvolver aplicações de consumo da KCL 2.x
<a name="developing-consumers-with-kcl-v2"></a>

**Importante**  
As versões 1.x e 2.x da Amazon Kinesis Client Library (KCL) estão desatualizadas. O KCL 1.x chegará end-of-support em 30 de janeiro de 2026. É **altamente recomendável** que você migre suas aplicações da KCL que usam a versão 1.x para a versão mais recente da KCL antes de 30 de janeiro de 2026. Para encontrar a versão mais recente da KCL, consulte a página da [biblioteca de cliente do Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) em. GitHub Para obter informações sobre as versões mais recentes da KCL, consulte [Usar a Kinesis Client Library](kcl.md). Para ter mais informações sobre como migrar da KCL 1.x para a KCL 3.x, consulte [Migrar da KCL 1.x para a KCL 3.x](kcl-migration-1-3.md).

Este tópico mostra como usar a versão 2.0 da Kinesis Client Library (KCL). 

Para obter mais informações sobre a KCL, consulte a visão geral fornecida em [Developing Consumers Using the Kinesis Client Library 1.x](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html).

Escolha um dos seguintes tópicos, dependendo do que deseja usar.

**Topics**
+ [Desenvolver uma aplicação de consumo da Kinesis Client Library em Java](kcl2-standard-consumer-java-example.md)
+ [Desenvolver uma aplicação de consumo da Kinesis Client Library em Python](kcl2-standard-consumer-python-example.md)
+ [Desenvolver consumidores de distribuição avançada com o KCL 2.x](building-enhanced-consumers-kcl-retired.md)

# Desenvolver uma aplicação de consumo da Kinesis Client Library em Java
<a name="kcl2-standard-consumer-java-example"></a>

**Importante**  
As versões 1.x e 2.x da Amazon Kinesis Client Library (KCL) estão desatualizadas. O KCL 1.x chegará end-of-support em 30 de janeiro de 2026. É **altamente recomendável** que você migre suas aplicações da KCL que usam a versão 1.x para a versão mais recente da KCL antes de 30 de janeiro de 2026. Para encontrar a versão mais recente da KCL, consulte a página da [biblioteca de cliente do Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) em. GitHub Para obter informações sobre as versões mais recentes da KCL, consulte [Usar a Kinesis Client Library](kcl.md). Para ter mais informações sobre como migrar da KCL 1.x para a KCL 3.x, consulte [Migrar da KCL 1.x para a KCL 3.x](kcl-migration-1-3.md).

O código a seguir mostra uma implementação de exemplo em Java de `ProcessorFactory` e `RecordProcessor`. Para aproveitar o recurso de distribuição avançada, consulte [Usar consumidores com distribuição avançada](https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-kcl-java.html).

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License.
 */


/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

/**
 * This class will run a simple app that uses the KCL to read data and uses the AWS SDK to publish data.
 * Before running this program you must first create a Kinesis stream through the AWS console or AWS SDK.
 */
public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    /**
     * Invoke the main method with 2 args: the stream name and (optionally) the region.
     * Verifies valid inputs and then starts running the app.
     */
    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    /**
     * Constructor sets streamName and region. It also creates a KinesisClient object to send data to Kinesis.
     * This KinesisClient is used to send dummy data so that the consumer has something to read; it is also used
     * indirectly by the KCL to handle the consumption of the data.
     */
    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {

        /**
         * Sends dummy data to Kinesis. Not relevant to consuming the data with the KCL
         */
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        /**
         * Sets up configuration for the KCL, including DynamoDB and CloudWatch dependencies. The final argument, a
         * ShardRecordProcessorFactory, is where the logic for record processing lives, and is located in a private
         * class below.
         */
        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        /**
         * The Scheduler (also called Worker in earlier versions of the KCL) is the entry point to the KCL. This
         * instance is configured with defaults provided by the ConfigsBuilder.
         */
        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );

        /**
         * Kickoff the Scheduler. Record processing of the stream of dummy data will continue indefinitely
         * until an exit is triggered.
         */
        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        /**
         * Allows termination of app by pressing Enter.
         */
        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        /**
         * Stops sending dummy data.
         */
        log.info("Cancelling producer and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        /**
         * Stops consuming data. Finishes processing the current batch of data already received from Kinesis
         * before shutting down.
         */
        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown.  Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    /**
     * Sends a single record of dummy data to Kinesis.
     */
    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }

    /**
     * The implementation of the ShardRecordProcessor interface is where the heart of the record processing logic lives.
     * In this example all we do to 'process' is log info about the records.
     */
    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        /**
         * Invoked by the KCL before data records are delivered to the ShardRecordProcessor instance (via
         * processRecords). In this example we do nothing except some logging.
         *
         * @param initializationInput Provides information related to initialization.
         */
        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Handles record processing logic. The Amazon Kinesis Client Library will invoke this method to deliver
         * data records to the application. In this example we simply log our records.
         *
         * @param processRecordsInput Provides the records to be processed as well as information and capabilities
         *                            related to them (e.g. checkpointing).
         */
        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /** Called when the lease tied to this record processor has been lost. Once the lease has been lost,
         * the record processor can no longer checkpoint.
         *
         * @param leaseLostInput Provides access to functions and data related to the loss of the lease.
         */
        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Called when all data on this shard has been processed. Checkpointing must occur in the method for record
         * processing to be considered complete; an exception will be thrown otherwise.
         *
         * @param shardEndedInput Provides access to a checkpointer method for completing processing of the shard.
         */
        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        /**
         * Invoked when Scheduler has been requested to shut down (i.e. we decide to stop running the app by pressing
         * Enter). Checkpoints and logs the data a final time.
         *
         * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint
         *                               before the shutdown is completed.
         */
        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```

# Desenvolver uma aplicação de consumo da Kinesis Client Library em Python
<a name="kcl2-standard-consumer-python-example"></a>

**Importante**  
As versões 1.x e 2.x da Amazon Kinesis Client Library (KCL) estão desatualizadas. O KCL 1.x chegará end-of-support em 30 de janeiro de 2026. É **altamente recomendável** que você migre suas aplicações da KCL que usam a versão 1.x para a versão mais recente da KCL antes de 30 de janeiro de 2026. Para encontrar a versão mais recente da KCL, consulte a página da [biblioteca de cliente do Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) em. GitHub Para obter informações sobre as versões mais recentes da KCL, consulte [Usar a Kinesis Client Library](kcl.md). Para ter mais informações sobre como migrar da KCL 1.x para a KCL 3.x, consulte [Migrar da KCL 1.x para a KCL 3.x](kcl-migration-1-3.md).

É possível usar a Kinesis Client Library (KCL) para criar aplicações que processam dados dos fluxos de dados do Kinesis. A Kinesis Client Library está disponível em várias linguagens. Este tópico discute Python.

A KCL é uma biblioteca Java; o suporte para linguagens diferentes de Java é fornecido usando uma interface multilíngue chamada de. *MultiLangDaemon* Esse daemon baseado em Java é executado em segundo plano quando uma linguagem de KCL diferente de Java é utilizada. Portanto, se você instalar o KCL para Python e escrever seu aplicativo de consumidor inteiramente em Python, ainda precisará do Java instalado em seu sistema por causa do. MultiLangDaemon Além disso, MultiLangDaemon tem algumas configurações padrão que você pode precisar personalizar para seu caso de uso, por exemplo, a AWS região à qual ele se conecta. Para obter mais informações sobre o MultiLangDaemon on GitHub, acesse a página do [ MultiLangDaemon projeto KCL](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang).

Para baixar o Python KCL em GitHub, acesse a [Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client-python) (Python). Para baixar o código de amostra para um aplicativo de consumidor do Python KCL, acesse a página do projeto de amostra do [KCL for Python](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples) em. GitHub

É necessário concluir as seguintes tarefas ao implementar uma aplicação de consumo da KCL em Python:

**Topics**
+ [Implemente os métodos RecordProcessor de classe](#kinesis-record-processor-implementation-interface-py)
+ [Modificar as propriedades de configuração](#kinesis-record-processor-initialization-py)

## Implemente os métodos RecordProcessor de classe
<a name="kinesis-record-processor-implementation-interface-py"></a>

A classe `RecordProcess` precisa estender a classe `RecordProcessorBase` para implementar os seguintes métodos:

```
initialize
process_records
shutdown_requested
```

Este exemplo fornece implementações que podem ser usadas como ponto de partida.

```
#!/usr/bin/env python

# Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Amazon Software License (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://aws.amazon.com/asl/
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

from __future__ import print_function

import sys
import time

from amazon_kclpy import kcl
from amazon_kclpy.v3 import processor


class RecordProcessor(processor.RecordProcessorBase):
    """
    A RecordProcessor processes data from a shard in a stream. Its methods will be called with this pattern:

    * initialize will be called once
    * process_records will be called zero or more times
    * shutdown will be called if this MultiLangDaemon instance loses the lease to this shard, or the shard ends due
        a scaling change.
    """
    def __init__(self):
        self._SLEEP_SECONDS = 5
        self._CHECKPOINT_RETRIES = 5
        self._CHECKPOINT_FREQ_SECONDS = 60
        self._largest_seq = (None, None)
        self._largest_sub_seq = None
        self._last_checkpoint_time = None

    def log(self, message):
        sys.stderr.write(message)

    def initialize(self, initialize_input):
        """
        Called once by a KCLProcess before any calls to process_records

        :param amazon_kclpy.messages.InitializeInput initialize_input: Information about the lease that this record
            processor has been assigned.
        """
        self._largest_seq = (None, None)
        self._last_checkpoint_time = time.time()

    def checkpoint(self, checkpointer, sequence_number=None, sub_sequence_number=None):
        """
        Checkpoints with retries on retryable exceptions.

        :param amazon_kclpy.kcl.Checkpointer checkpointer: the checkpointer provided to either process_records
            or shutdown
        :param str or None sequence_number: the sequence number to checkpoint at.
        :param int or None sub_sequence_number: the sub sequence number to checkpoint at.
        """
        for n in range(0, self._CHECKPOINT_RETRIES):
            try:
                checkpointer.checkpoint(sequence_number, sub_sequence_number)
                return
            except kcl.CheckpointError as e:
                if 'ShutdownException' == e.value:
                    #
                    # A ShutdownException indicates that this record processor should be shutdown. This is due to
                    # some failover event, e.g. another MultiLangDaemon has taken the lease for this shard.
                    #
                    print('Encountered shutdown exception, skipping checkpoint')
                    return
                elif 'ThrottlingException' == e.value:
                    #
                    # A ThrottlingException indicates that one of our dependencies is is over burdened, e.g. too many
                    # dynamo writes. We will sleep temporarily to let it recover.
                    #
                    if self._CHECKPOINT_RETRIES - 1 == n:
                        sys.stderr.write('Failed to checkpoint after {n} attempts, giving up.\n'.format(n=n))
                        return
                    else:
                        print('Was throttled while checkpointing, will attempt again in {s} seconds'
                              .format(s=self._SLEEP_SECONDS))
                elif 'InvalidStateException' == e.value:
                    sys.stderr.write('MultiLangDaemon reported an invalid state while checkpointing.\n')
                else:  # Some other error
                    sys.stderr.write('Encountered an error while checkpointing, error was {e}.\n'.format(e=e))
            time.sleep(self._SLEEP_SECONDS)

    def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
        """
        Called for each record that is passed to process_records.

        :param str data: The blob of data that was contained in the record.
        :param str partition_key: The key associated with this recod.
        :param int sequence_number: The sequence number associated with this record.
        :param int sub_sequence_number: the sub sequence number associated with this record.
        """
        ####################################
        # Insert your processing logic here
        ####################################
        self.log("Record (Partition Key: {pk}, Sequence Number: {seq}, Subsequence Number: {sseq}, Data Size: {ds}"
                 .format(pk=partition_key, seq=sequence_number, sseq=sub_sequence_number, ds=len(data)))

    def should_update_sequence(self, sequence_number, sub_sequence_number):
        """
        Determines whether a new larger sequence number is available

        :param int sequence_number: the sequence number from the current record
        :param int sub_sequence_number: the sub sequence number from the current record
        :return boolean: true if the largest sequence should be updated, false otherwise
        """
        return self._largest_seq == (None, None) or sequence_number > self._largest_seq[0] or \
            (sequence_number == self._largest_seq[0] and sub_sequence_number > self._largest_seq[1])

    def process_records(self, process_records_input):
        """
        Called by a KCLProcess with a list of records to be processed and a checkpointer which accepts sequence numbers
        from the records to indicate where in the stream to checkpoint.

        :param amazon_kclpy.messages.ProcessRecordsInput process_records_input: the records, and metadata about the
            records.
        """
        try:
            for record in process_records_input.records:
                data = record.binary_data
                seq = int(record.sequence_number)
                sub_seq = record.sub_sequence_number
                key = record.partition_key
                self.process_record(data, key, seq, sub_seq)
                if self.should_update_sequence(seq, sub_seq):
                    self._largest_seq = (seq, sub_seq)

            #
            # Checkpoints every self._CHECKPOINT_FREQ_SECONDS seconds
            #
            if time.time() - self._last_checkpoint_time > self._CHECKPOINT_FREQ_SECONDS:
                self.checkpoint(process_records_input.checkpointer, str(self._largest_seq[0]), self._largest_seq[1])
                self._last_checkpoint_time = time.time()

        except Exception as e:
            self.log("Encountered an exception while processing records. Exception was {e}\n".format(e=e))

    def lease_lost(self, lease_lost_input):
        self.log("Lease has been lost")

    def shard_ended(self, shard_ended_input):
        self.log("Shard has ended checkpointing")
        shard_ended_input.checkpointer.checkpoint()

    def shutdown_requested(self, shutdown_requested_input):
        self.log("Shutdown has been requested, checkpointing.")
        shutdown_requested_input.checkpointer.checkpoint()


if __name__ == "__main__":
    kcl_process = kcl.KCLProcess(RecordProcessor())
    kcl_process.run()
```

## Modificar as propriedades de configuração
<a name="kinesis-record-processor-initialization-py"></a>

O exemplo fornece valores padrão para as propriedades de configuração, conforme mostrado no script a seguir. É possível substituir qualquer uma dessas propriedades por seus próprios valores.

```
# The script that abides by the multi-language protocol. This script will
# be executed by the MultiLangDaemon, which will communicate with this script
# over STDIN and STDOUT according to the multi-language protocol.
executableName = sample_kclpy_app.py

# The name of an Amazon Kinesis stream to process.
streamName = words

# Used by the KCL as the name of this application. Will be used as the name
# of an Amazon DynamoDB table which will store the lease and checkpoint
# information for workers with this application name
applicationName = PythonKCLSample

# Users can change the credentials provider the KCL will use to retrieve credentials.
# The DefaultAWSCredentialsProviderChain checks several other providers, which is
# described here:
# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain

# Appended to the user agent of the KCL. Does not impact the functionality of the
# KCL in any other way.
processingLanguage = python/2.7

# Valid options at TRIM_HORIZON or LATEST.
# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax
initialPositionInStream = TRIM_HORIZON

# The following properties are also available for configuring the KCL Worker that is created
# by the MultiLangDaemon.

# The KCL defaults to us-east-1
#regionName = us-east-1

# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
# will be regarded as having problems and it's shards will be assigned to other workers.
# For applications that have a large number of shards, this msy be set to a higher number to reduce
# the number of DynamoDB IOPS required for tracking leases
#failoverTimeMillis = 10000

# A worker id that uniquely identifies this worker among all workers using the same applicationName
# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself.
#workerId = 

# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
#shardSyncIntervalMillis = 60000

# Max records to fetch from Kinesis in a single GetRecords call.
#maxRecords = 10000

# Idle time between record reads in milliseconds.
#idleTimeBetweenReadsInMillis = 1000

# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while)
#callProcessRecordsEvenForEmptyRecordList = false

# Interval in milliseconds between polling to check for parent shard completion.
# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
# completion of parent shards).
#parentShardPollIntervalMillis = 10000

# Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try
# to delete the ones we don't need any longer.
#cleanupLeasesUponShardCompletion = true

# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
#taskBackoffTimeMillis = 500

# Buffer metrics for at most this long before publishing to CloudWatch.
#metricsBufferTimeMillis = 10000

# Buffer at most this many metrics before publishing to CloudWatch.
#metricsMaxQueueSize = 10000

# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
# to RecordProcessorCheckpointer#checkpoint(String) by default.
#validateSequenceNumberBeforeCheckpointing = true

# The maximum number of active threads for the MultiLangDaemon to permit.
# If a value is provided then a FixedThreadPool is used with the maximum
# active threads set to the provided value. If a non-positive integer or no
# value is provided a CachedThreadPool is used.
#maxActiveThreads = 0
```

### Nome da aplicação
<a name="kinesis-record-processor-application-name-py"></a>

A KCL exige um nome de aplicação exclusivo entre as aplicações e as tabelas do Amazon DynamoDB na mesma região. Ela usa o valor de configuração de nome de aplicativo das seguintes formas:
+ Presume-se que todos os operadores associados a esse nome de aplicativo estejam trabalhando juntos no mesmo fluxo. Esses operadores podem ser distribuídos entre várias instâncias. Se você executar uma instância adicional do mesmo código da aplicação, mas com um nome diferente, a KCL tratará a segunda instância como uma aplicação totalmente independente operando no mesmo fluxo.
+ A KCL cria uma tabela do DynamoDB com o nome da aplicação e usa essa tabela para manter informações de estado (como pontos de verificação e mapeamento de operador-fragmento) da aplicação. Cada aplicação tem sua própria tabela do DynamoDB. Para obter mais informações, consulte [Usar uma tabela de concessões para monitorar os fragmentos processados pela aplicação de consumo da KCL](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable).

### Credenciais
<a name="kinesis-record-processor-creds-py"></a>

Você deve disponibilizar suas AWS credenciais para um dos provedores de credenciais na cadeia de provedores de [credenciais padrão](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). A propriedade `AWSCredentialsProvider` pode ser usada para definir um provedor de credenciais. Se você executar seu aplicativo de consumidor em uma instância do Amazon EC2, recomendamos que você configure a instância com uma função do IAM. AWS as credenciais que refletem as permissões associadas a essa função do IAM são disponibilizadas aos aplicativos na instância por meio dos metadados da instância. Essa é a maneira mais segura de gerenciar credenciais para uma aplicação de consumo em execução em uma instância do EC2.

# Desenvolver consumidores de distribuição avançada com o KCL 2.x
<a name="building-enhanced-consumers-kcl-retired"></a>

**Importante**  
As versões 1.x e 2.x da Amazon Kinesis Client Library (KCL) estão desatualizadas. O KCL 1.x chegará end-of-support em 30 de janeiro de 2026. É **altamente recomendável** que você migre suas aplicações da KCL que usam a versão 1.x para a versão mais recente da KCL antes de 30 de janeiro de 2026. Para encontrar a versão mais recente da KCL, consulte a página da [biblioteca de cliente do Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) em. GitHub Para obter informações sobre as versões mais recentes da KCL, consulte [Usar a Kinesis Client Library](kcl.md). Para ter mais informações sobre como migrar da KCL 1.x para a KCL 3.x, consulte [Migrar da KCL 1.x para a KCL 3.x](kcl-migration-1-3.md).

Os consumidores que usam a *distribuição avançada* no Amazon Kinesis Data Streams podem receber registros de um fluxo de dados com throughput dedicada de até 2 MB de dados por segundo por fragmento. Esse tipo de consumidor não precisa lidar com outros consumidores que estejam recebendo dados do fluxo. Para obter mais informações, consulte [Desenvolver consumidores de distribuição avançada com throughput dedicado](enhanced-consumers.md).

É possível usar a versão 2.0 ou posterior da Kinesis Client Library (KCL) para desenvolver aplicações que usam a distribuição avançada para receber dados de fluxos. A KCL inscreve automaticamente seu aplicativo em todos os fragmentos de um stream e garante que seu aplicativo consumidor possa ler com um valor de taxa de transferência de 2 por fragmento. MB/sec Para usar a KCL sem ativar a distribuição avançada, consulte [Desenvolvendo aplicações de consumo usando a Kinesis Client Library 2.0](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl-v2.html).

**Topics**
+ [Desenvolver consumidores de distribuição avançada usando o KCL 2.x em Java](building-enhanced-consumers-kcl-java.md)

# Desenvolver consumidores de distribuição avançada usando o KCL 2.x em Java
<a name="building-enhanced-consumers-kcl-java"></a>

**Importante**  
As versões 1.x e 2.x da Amazon Kinesis Client Library (KCL) estão desatualizadas. O KCL 1.x chegará end-of-support em 30 de janeiro de 2026. É **altamente recomendável** que você migre suas aplicações da KCL que usam a versão 1.x para a versão mais recente da KCL antes de 30 de janeiro de 2026. Para encontrar a versão mais recente da KCL, consulte a página da [biblioteca de cliente do Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) em. GitHub Para obter informações sobre as versões mais recentes da KCL, consulte [Usar a Kinesis Client Library](kcl.md). Para ter mais informações sobre como migrar da KCL 1.x para a KCL 3.x, consulte [Migrar da KCL 1.x para a KCL 3.x](kcl-migration-1-3.md).

É possível usar a versão 2.0 ou posterior da Kinesis Client Library (KCL) para desenvolver aplicações no Amazon Kinesis Data Streams que recebem dados de fluxos usando a distribuição avançada. O código a seguir mostra uma implementação de exemplo em Java de `ProcessorFactory` e `RecordProcessor`.

É recomendável o uso de `KinesisClientUtil` para criar `KinesisAsyncClient` e configurar `maxConcurrency` no `KinesisAsyncClient`.

**Importante**  
O Amazon Kinesis Client pode ter latência significativamente maior, a menos que se configure `KinesisAsyncClient` para ter um `maxConcurrency` alto o suficiente para permitir todas as concessões e usos adicionais do `KinesisAsyncClient`.

```
/*
 *  Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 *  Licensed under the Amazon Software License (the "License").
 *  You may not use this file except in compliance with the License.
 *  A copy of the License is located at
 *
 *  http://aws.amazon.com/asl/
 *
 *  or in the "license" file accompanying this file. This file is distributed
 *  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 *  express or implied. See the License for the specific language governing
 *  permissions and limitations under the License. 
 */

/*
 * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class SampleSingle {

    private static final Logger log = LoggerFactory.getLogger(SampleSingle.class);

    public static void main(String... args) {
        if (args.length < 1) {
            log.error("At a minimum, the stream name is required as the first argument. The Region may be specified as the second argument.");
            System.exit(1);
        }

        String streamName = args[0];
        String region = null;
        if (args.length > 1) {
            region = args[1];
        }

        new SampleSingle(streamName, region).run();
    }

    private final String streamName;
    private final Region region;
    private final KinesisAsyncClient kinesisClient;

    private SampleSingle(String streamName, String region) {
        this.streamName = streamName;
        this.region = Region.of(ObjectUtils.firstNonNull(region, "us-east-2"));
        this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region));
    }

    private void run() {
        ScheduledExecutorService producerExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, streamName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());

        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            log.error("Caught exception while waiting for confirm. Shutting down.", ioex);
        }

        log.info("Cancelling producer, and shutting down executor.");
        producerFuture.cancel(true);
        producerExecutor.shutdownNow();

        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        log.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            log.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            log.error("Timeout while waiting for shutdown. Scheduler may not have exited.");
        }
        log.info("Completed, shutting down now.");
    }

    private void publishRecord() {
        PutRecordRequest request = PutRecordRequest.builder()
                .partitionKey(RandomStringUtils.randomAlphabetic(5, 20))
                .streamName(streamName)
                .data(SdkBytes.fromByteArray(RandomUtils.nextBytes(10)))
                .build();
        try {
            kinesisClient.putRecord(request).get();
        } catch (InterruptedException e) {
            log.info("Interrupted, assuming shutdown.");
        } catch (ExecutionException e) {
            log.error("Exception while sending data to Kinesis. Will try again next cycle.", e);
        }
    }

    private static class SampleRecordProcessorFactory implements ShardRecordProcessorFactory {
        public ShardRecordProcessor shardRecordProcessor() {
            return new SampleRecordProcessor();
        }
    }


    private static class SampleRecordProcessor implements ShardRecordProcessor {

        private static final String SHARD_ID_MDC_KEY = "ShardId";

        private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class);

        private String shardId;

        public void initialize(InitializationInput initializationInput) {
            shardId = initializationInput.shardId();
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void processRecords(ProcessRecordsInput processRecordsInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Processing {} record(s)", processRecordsInput.records().size());
                processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()));
            } catch (Throwable t) {
                log.error("Caught throwable while processing records. Aborting.");
                Runtime.getRuntime().halt(1);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void leaseLost(LeaseLostInput leaseLostInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Lost lease, so terminating.");
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shardEnded(ShardEndedInput shardEndedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at shard end. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }

        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            MDC.put(SHARD_ID_MDC_KEY, shardId);
            try {
                log.info("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            } finally {
                MDC.remove(SHARD_ID_MDC_KEY);
            }
        }
    }

}
```

# Migrar consumidores da KCL 1.x para a KCL 2.x
<a name="kcl-migration"></a>

**Importante**  
As versões 1.x e 2.x da Amazon Kinesis Client Library (KCL) estão desatualizadas. O KCL 1.x chegará end-of-support em 30 de janeiro de 2026. É **altamente recomendável** que você migre suas aplicações da KCL que usam a versão 1.x para a versão mais recente da KCL antes de 30 de janeiro de 2026. Para encontrar a versão mais recente da KCL, consulte a página da [biblioteca de cliente do Amazon Kinesis](https://github.com/awslabs/amazon-kinesis-client) em. GitHub Para obter informações sobre as versões mais recentes da KCL, consulte [Usar a Kinesis Client Library](kcl.md). Para ter mais informações sobre como migrar da KCL 1.x para a KCL 3.x, consulte [Migrar da KCL 1.x para a KCL 3.x](kcl-migration-1-3.md).

Este tópico explica as diferenças entre as versões 1.x e 2.x da Kinesis Client Library (KCL). Ele também mostra como migrar o consumidor da versão 1.x para a versão 2.x da KCL. Depois de migrar o cliente, ele iniciará o processamento de registros a partir do local verificado pela última vez.

A versão 2.0 da KCL apresenta as seguintes alterações de interface:


**Alterações de interface da KCL**  

| Interface KCL 1.x | Interface KCL 2.0 | 
| --- | --- | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor | software.amazon.kinesis.processor.ShardRecordProcessor | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory | software.amazon.kinesis.processor.ShardRecordProcessorFactory | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware | Compactada em software.amazon.kinesis.processor.ShardRecordProcessor | 

**Topics**
+ [Migrar o processador de registros](#recrod-processor-migration)
+ [Migrar a fábrica do processador de registros](#recrod-processor-factory-migration)
+ [Migração do operador](#worker-migration)
+ [Configurar o cliente do Amazon Kinesis](#client-configuration)
+ [Remoção do tempo ocioso](#idle-time-removal)
+ [Remoções de configuração de cliente](#client-configuration-removals)

## Migrar o processador de registros
<a name="recrod-processor-migration"></a>

Este exemplo mostra um processador de registros implementado para a KCL 1.x:

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        //
        // Process records, and possibly checkpoint
        //
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**Como migrar a classe de processador de registro**

1. Altere as interfaces de `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` e `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` para `software.amazon.kinesis.processor.ShardRecordProcessor`, da seguinte forma:

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   
   // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
   public class TestRecordProcessor implements ShardRecordProcessor {
   ```

1. Atualize as instruções `import` para os métodos `initialize` e `processRecords`.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
   ```

1. Substitua o método `shutdown` pelos novos métodos a seguir: `leaseLost`, `shardEnded`, e `shutdownRequested`.

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

Veja a seguir a versão atualizada da classe de processador de registro.

```
package com.amazonaws.kcl;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;

public class TestRecordProcessor implements ShardRecordProcessor {
    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

## Migrar a fábrica do processador de registros
<a name="recrod-processor-factory-migration"></a>

A fábrica do processador de registros é responsável por criar processadores de registro quando uma concessão é realizada. Veja a seguir um exemplo de uma fábrica da KCL 1.x.

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class TestRecordProcessorFactory implements IRecordProcessorFactory {
    @Override
    public IRecordProcessor createProcessor() {
        return new TestRecordProcessor();
    }
}
```

**Migrar a fábrica do processador de registros**

1. Altere a interface implementada de `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` para `software.amazon.kinesis.processor.ShardRecordProcessorFactory`, da seguinte forma:

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
   
   // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
   public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
   ```

1. Alterar a assinatura de retorno para `createProcessor`.

   ```
   // public IRecordProcessor createProcessor() {
   public ShardRecordProcessor shardRecordProcessor() {
   ```

Veja a seguir um exemplo da fábrica do processador de registros em 2.0:

```
package com.amazonaws.kcl;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new TestRecordProcessor();
    }
}
```

## Migração do operador
<a name="worker-migration"></a>

Na versão 2.0 da KCL, uma nova classe, chamada `Scheduler`, substitui a classe `Worker`. Veja a seguir um exemplo de um operador da KCL 1.x.

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

**Para migrar o operador**

1. Altere a instrução `import` para a classe `Worker` para as instruções de importação para as classes `Scheduler` e `ConfigsBuilder`.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. Crie o `ConfigsBuilder` e um `Scheduler` conforme mostrado no exemplo a seguir.

   É recomendável o uso de `KinesisClientUtil` para criar `KinesisAsyncClient` e configurar `maxConcurrency` no `KinesisAsyncClient`.
**Importante**  
O Amazon Kinesis Client pode ter latência significativamente maior, a menos que se configure `KinesisAsyncClient` para ter um `maxConcurrency` alto o suficiente para permitir todas as concessões e usos adicionais do `KinesisAsyncClient`.

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   import software.amazon.kinesis.common.ConfigsBuilder;
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
   Region region = Region.AP_NORTHEAST_2;
   KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region));
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
   ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());
   
   Scheduler scheduler = new Scheduler(
       configsBuilder.checkpointConfig(),
       configsBuilder.coordinatorConfig(),
       configsBuilder.leaseManagementConfig(),
       configsBuilder.lifecycleConfig(),
       configsBuilder.metricsConfig(),
       configsBuilder.processorConfig(),
       configsBuilder.retrievalConfig()
       );
   ```

## Configurar o cliente do Amazon Kinesis
<a name="client-configuration"></a>

Com a versão 2.0 da Kinesis Client Library, a configuração do cliente passou de uma única classe de configuração (`KinesisClientLibConfiguration`) para seis classes. A tabela a seguir descreve a migração.


**Campos de Configuração e suas Novas Classes**  

| Campo Original | Nova classe de configuração | Description | 
| --- | --- | --- | 
| applicationName | ConfigsBuilder | O nome da aplicação da KCL. Usado como padrão para o tableName e o consumerName. | 
| tableName | ConfigsBuilder | Permite substituir o nome usado para a tabela de concessão do Amazon DynamoDB. | 
| streamName | ConfigsBuilder | O nome do fluxo a partir do qual esse aplicativo processa registros. | 
| kinesisEndpoint | ConfigsBuilder | Essa opção não existe mais. Consulte remoções de configuração de cliente. | 
| dynamoDBEndpoint | ConfigsBuilder | Essa opção não existe mais. Consulte remoções de configuração de cliente. | 
| initialPositionInStreamExtended | RetrievalConfig | A localização no fragmento a partir da qual a KCL começa a obter registros, começando com a execução inicial do aplicativo. | 
| kinesisCredentialsProvider | ConfigsBuilder | Essa opção não existe mais. Consulte remoções de configuração de cliente. | 
| dynamoDBCredentialsProvider | ConfigsBuilder | Essa opção não existe mais. Consulte remoções de configuração de cliente. | 
| cloudWatchCredentialsProvider | ConfigsBuilder | Essa opção não existe mais. Consulte remoções de configuração de cliente. | 
| failoverTimeMillis | LeaseManagementConfig | O número de milissegundos que devem passar antes que se considere uma falha do proprietário da concessão. | 
| workerIdentifier | ConfigsBuilder | Um identificador exclusivo que representa a instanciação do processador do aplicativo. Isso deve ser exclusivo. | 
| shardSyncIntervalMillis | LeaseManagementConfig | O tempo entre as chamadas de sincronização de fragmentos. | 
| maxRecords | PollingConfig | Permite definir o número máximo de registros que o Kinesis retorna. | 
| idleTimeBetweenReadsInMillis | CoordinatorConfig | Essa opção não existe mais. Consulte a remoção do tempo ocioso. | 
| callProcessRecordsEvenForEmptyRecordList | ProcessorConfig | Quando definido, o processador de registros é chamado mesmo quando o Kinesis não fornece nenhum registro. | 
| parentShardPollIntervalMillis | CoordinatorConfig | Com que frequência um processador de registros deve sondar a conclusão de fragmentos pai. | 
| cleanupLeasesUponShardCompletion | LeaseManagementConfig | Quando definidas, as concessões são removidas assim que as concessões filho iniciam o processamento. | 
| ignoreUnexpectedChildShards | LeaseManagementConfig | Quando definidos, fragmentos filho que possuem um fragmento aberto são ignorados. Essa configuração destina-se principalmente a fluxos do DynamoDB. | 
| kinesisClientConfig | ConfigsBuilder | Essa opção não existe mais. Consulte remoções de configuração de cliente. | 
| dynamoDBClientConfig | ConfigsBuilder | Essa opção não existe mais. Consulte remoções de configuração de cliente. | 
| cloudWatchClientConfig | ConfigsBuilder | Essa opção não existe mais. Consulte remoções de configuração de cliente. | 
| taskBackoffTimeMillis | LifecycleConfig | O tempo de espera para repetir tarefas com falha. | 
| metricsBufferTimeMillis | MetricsConfig | Controla a publicação de CloudWatch métricas. | 
| metricsMaxQueueSize | MetricsConfig | Controla a publicação de CloudWatch métricas. | 
| metricsLevel | MetricsConfig | Controla a publicação de CloudWatch métricas. | 
| metricsEnabledDimensions | MetricsConfig | Controla a publicação de CloudWatch métricas. | 
| validateSequenceNumberBeforeCheckpointing | CheckpointConfig | Essa opção não existe mais. Consulte a validação do número de sequência do ponto de verificação. | 
| regionName | ConfigsBuilder | Essa opção não existe mais. Consulte remoção de configuração de cliente. | 
| maxLeasesForWorker | LeaseManagementConfig | O número máximo de concessões que uma única instância do aplicativo deve aceitar. | 
| maxLeasesToStealAtOneTime | LeaseManagementConfig | O número máximo de concessões que um aplicativo deve tentar roubar de uma só vez. | 
| initialLeaseTableReadCapacity | LeaseManagementConfig | A IOPs leitura do DynamoDB que é usada se a biblioteca cliente do Kinesis precisar criar uma nova tabela de lease do DynamoDB. | 
| initialLeaseTableWriteCapacity | LeaseManagementConfig | A IOPs leitura do DynamoDB que é usada se a biblioteca cliente do Kinesis precisar criar uma nova tabela de lease do DynamoDB. | 
| initialPositionInStreamExtended | LeaseManagementConfig | A posição inicial do aplicativo no fluxo. Isso é usado somente durante a criação da concessão inicial. | 
| skipShardSyncAtWorkerInitializationIfLeasesExist | CoordinatorConfig | Desative a sincronização de dados de fragmento se a tabela de concessão contiver concessões existentes. TODO: KinesisEco -438 | 
| shardPrioritization | CoordinatorConfig | A priorização de fragmentos a ser usada. | 
| shutdownGraceMillis | N/D | Essa opção não existe mais. Consulte MultiLang Remoções. | 
| timeoutInSeconds | N/D | Essa opção não existe mais. Consulte MultiLang Remoções. | 
| retryGetRecordsInSeconds | PollingConfig | Configura o atraso entre as GetRecords tentativas de falhas. | 
| maxGetRecordsThreadPool | PollingConfig | O tamanho do pool de fios usado para GetRecords. | 
| maxLeaseRenewalThreads | LeaseManagementConfig | Controla o tamanho do grupo de threads de renovação de concessão. Quanto mais concessões seu aplicativo aceitar, maior esse grupo deve ser. | 
| recordsFetcherFactory | PollingConfig | Permite substituir a fábrica usada para criar extratores que recuperam dados dos streams. | 
| logWarningForTaskAfterMillis | LifecycleConfig | Quanto tempo esperar antes de um aviso ser registrado caso uma tarefa não seja concluída. | 
| listShardsBackoffTimeInMillis | RetrievalConfig | O número de milissegundos de espera entre as chamadas para ListShards em caso de falha. | 
| maxListShardsRetryAttempts | RetrievalConfig | O número máximo de novas tentativas de ListShards antes de desistir. | 

## Remoção do tempo ocioso
<a name="idle-time-removal"></a>

Na versão 1.x da KCL, `idleTimeBetweenReadsInMillis` corresponde a duas quantidades: 
+ A quantidade de tempo entre as verificações de envio de tarefas. Agora é possível configurar esse tempo entre tarefas, definindo `CoordinatorConfig#shardConsumerDispatchPollIntervalMillis`.
+ A quantidade de tempo inativo quando nenhum registro é retornado do Kinesis Data Streams. Na versão 2.0, em distribuição avançada registros são enviados a partir de sua respectiva recuperação. Atividade no do consumidor fragmento só ocorre quando uma solicitação é enviada. 

## Remoções de configuração de cliente
<a name="client-configuration-removals"></a>

Na versão 2.0, a KCL não cria mais clientes. Ela depende do fornecimento de um cliente válido pelo usuário. Com essa alteração, todos os parâmetros de configuração que controlavam a criação do cliente foram removidos. Se esses parâmetros forem necessários, pode-se configurá-los nos clientes antes de fornecê-los ao `ConfigsBuilder`.


****  

| Campo removido | Configuração equivalente | 
| --- | --- | 
| kinesisEndpoint | Configure o SDK KinesisAsyncClient com o endpoint de sua preferência: KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis endpoint>")).build(). | 
| dynamoDBEndpoint | Configure o SDK DynamoDbAsyncClient com o endpoint de sua preferência: DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb endpoint>")).build(). | 
| kinesisClientConfig | Configure o SDK KinesisAsyncClient com a configuração necessária: KinesisAsyncClient.builder().overrideConfiguration(<your configuration>).build(). | 
| dynamoDBClientConfig | Configure o SDK DynamoDbAsyncClient com a configuração necessária: DynamoDbAsyncClient.builder().overrideConfiguration(<your configuration>).build(). | 
| cloudWatchClientConfig | Configure o SDK CloudWatchAsyncClient com a configuração necessária: CloudWatchAsyncClient.builder().overrideConfiguration(<your configuration>).build(). | 
| regionName | Configure o SDK com a Região de sua preferência. Ela é a mesma para todos os clientes do SDK. Por exemplo, .KinesisAsyncClient.builder().region(Region.US\$1WEST\$12).build() | 