As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Informações sobre KCL 1.x e 2.x
nota
As versões 1.x e 2.x da Kinesis Client Library (KCL) estão desatualizadas. Recomendamos migrar para a versão 3.x do KCL, que oferece desempenho aprimorado e novos recursos. Para obter a documentação mais recente da KCL e o guia de migração, consulteUse a biblioteca de cliente Kinesis.
Um dos métodos de desenvolvimento de aplicações de consumo personalizadas que podem processar dados de fluxos de dados do KDS é usar a Kinesis Client Library (KCL).
Tópicos
- Sobre a KCL (versões anteriores)
- Versões anteriores do KCL
- Conceitos da KCL (versões anteriores)
- Usar uma tabela de concessões para monitorar os fragmentos processados pela aplicação de consumo da KCL
- Processar vários fluxos de dados com a mesma aplicação de consumo da KCL 2.x para Java
- Use o KCL com o Registro do AWS Glue Esquema
nota
Recomenda-se o uso da versão mais recente da KCL 1.x ou da KCL 2.x, dependendo do cenário de uso. Ambas as versões da KCL, tanto 1.x como a 2.x, são atualizadas regularmente para incluir os patches de dependência e segurança e as correções de bugs mais recentes, além de novos recursos compatíveis com versões anteriores. Para obter mais informações, consulte https://github.com/awslabs/amazon-kinesis-client/releases
Sobre a KCL (versões anteriores)
A KCL ajuda você a consumir e processar dados de um fluxo de dados do Kinesis lidando com muitas das tarefas complexas associadas à computação distribuída. Isso inclui balanceamento de carga em várias instâncias de aplicações de consumo, resposta a falhas nas instâncias de aplicações de clientes, verificação de registros processados e reação à refragmentação. A KCL cuida de todas essas subtarefas para possibilitar a concetração de esforços na escrita de uma lógica personalizada de processamento de registros.
O KCL é diferente dos Kinesis Data APIs Streams que estão disponíveis no. AWS SDKs O Kinesis APIs Data Streams ajuda você a gerenciar muitos aspectos do Kinesis Data Streams, incluindo a criação de streams, a refragmentação e a colocação e obtenção de registros. A KCL fornece uma camada de abstração em torno de todas essas subtarefas, especificamente para possibilitar a concentração na lógica de processamento de dados personalizada da aplicação de consumo. Para obter informações sobre a API do Kinesis Data Streams, consulte a Referência de APIs do Amazon Kinesis.
Importante
A KCL é uma biblioteca Java. Support para linguagens diferentes de Java é fornecido usando uma interface multilíngue chamada de MultiLangDaemon. Esse daemon baseado em Java é executado em segundo plano quando uma linguagem de KCL diferente de Java é utilizada. Por exemplo, se você instalar o KCL para Python e escrever seu aplicativo de consumidor inteiramente em Python, você ainda precisará do Java instalado em seu sistema por causa do. MultiLangDaemon Além disso, MultiLangDaemon tem algumas configurações padrão que talvez você precise personalizar para seu caso de uso, por exemplo, a AWS região à qual ele se conecta. Para obter mais informações sobre o MultiLangDaemon on GitHub, consulte o MultiLangDaemon projeto KCL
A KCL atua como um intermediário entre a lógica de processamento de registros e o Kinesis Data Streams.
Versões anteriores do KCL
Atualmente, é possível usar qualquer uma destas versões compatíveis da KCL para criar aplicações de consumo personalizadas:
-
KCL 1.x
Para ter mais informações, consulte Desenvolver aplicações de consumo da KCL 1.x
-
KCL 2.x
Para ter mais informações, consulte Desenvolver aplicações de consumo da KCL 2.x
Pode-se usar a KCL 1.x ou a KCL 2.x para criar aplicações de consumo que usam throughput compartilhada. Para obter mais informações, consulte Desenvolver aplicações de consumo personalizadas com throughput compartilhada usando a KCL.
Para criar aplicações de consumo que usam throughput dedicada (consumidores de distribuição avançada), só é possível usar a KCL 2.x. Para obter mais informações, consulte Desenvolva consumidores de distribuição aprimorados com taxa de transferência dedicada.
Para obter informações sobre as diferenças entre a KCL 1.x e a KCL 2.x e instruções sobre como migrar da KCL 1.x para a KCL 2.x, consulte Migre consumidores do KCL 1.x para o KCL 2.x.
Conceitos da KCL (versões anteriores)
-
Aplicação de consumo da KCL: uma aplicação personalizada que usa a KCL e é projetada para ler e processar registros de fluxos de dados.
-
Instância de aplicação de consumo: as aplicações de cliente da KCL normalmente são distribuídas, com uma ou mais instâncias executadas simultaneamente para coordenar falhas e balancear dinamicamente a carga de processamento dos registros de dados.
-
Operador: uma classe de alto nível que uma instância de aplicação de consumo da KCL usa para começar a processar dados.
Importante
Cada instância da aplicação de consumo da KCL tem um operador.
O operador inicializa e supervisiona várias tarefas, incluindo a sincronização de informações de fragmentos e concessões, o monitoramento de atribuições de fragmentos e o processamento dos dados dos fragmentos. Um trabalhador fornece à KCL as informações de configuração do aplicativo consumidor, como o nome do fluxo de dados cujos registros de dados esse aplicativo consumidor KCL processará e AWS as credenciais necessárias para acessar esse fluxo de dados. O operador também inicia a instância específica da aplicação de consumo da KCL para entregar registros de dados do fluxo de dados aos processadores de registros.
Importante
Na KCL 1.x, essa classe é chamada de operador. Para obter mais informações (esses são os repositórios Java KCL), consulte https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker
.java. Na KCL 2.x, essa classe é chamada de programador. A finalidade do programador na KCL 2.x é idêntica à finalidade do operador na KCL 1.x. Para obter mais informações sobre a classe Scheduler no KCL 2.x, consulte https://github.com/awslabs/amazon-kinesis-client/.java. blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler -
Concessão: dado que define a ligação entre um operador e um fragmento. As aplicações de cliente distribuídas da KCL usam concessões para particionar o processamento de registros de dados em uma frota de operadores. A qualquer momento, cada fragmento de registros de dados é associado a um determinado operador por uma concessão identificada pela variável leaseKey.
Por padrão, um trabalhador pode manter um ou mais arrendamentos (sujeito ao valor da variável maxLeasesForWorker) ao mesmo tempo.
Importante
Os operadores competem para manter todas as concessões disponíveis para todos os fragmentos disponíveis em um fluxo de dados. Mas apenas um operador consegue manter uma concessão de cada vez.
Por exemplo, se houver uma instância da aplicação de consumo A com o operador A que está processando um fluxo de dados com quatro fragmentos, o operador A poderá reter as concessões aos fragmentos 1, 2, 3 e 4 ao mesmo tempo. Mas, se houver duas instâncias de aplicações de consumo A e B com os operadores A e B, e essas instâncias estiverem processando um fluxo de dados com quatro fragmentos, o operador A e o operador B não poderão reter a concessão ao fragmento 1 ao mesmo tempo. Um operador retém a concessão a um fragmento específico até estar pronto para parar de processar os registros de dados do fragmento ou até que uma falha ocorra. Quando um operador libera a concessão, outro operador a assume e a retém.
Para obter mais informações (esses são os repositórios Java KCL), consulte https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java para KCL 1.x e https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease
.java para KCL 2.x. -
Tabela de concessões: uma tabela exclusiva do Amazon DynamoDB usada para monitorar os fragmentos em um fluxo de dados do KDS vinculados a uma concessão e sendo processados pelos operadores da aplicação de consumo da KCL. A tabela de concessões precisa permanecer sincronizada (em um operador e entre todos os operadores) com as informações mais recentes do fragmento do fluxo de dados enquanto a aplicação de consumo da KCL está em execução. Para obter mais informações, consulte Usar uma tabela de concessões para monitorar os fragmentos processados pela aplicação de consumo da KCL.
-
Processador de registros: a lógica que define como a aplicação de consumo da KCL processa os dados obtidos dos fluxos de dados. Em runtime, uma instância da aplicação de consumo da KCL inicia um operador, que, por sua vez, inicia um processador de registros para cada fragmento cuja concessão retém.
Usar uma tabela de concessões para monitorar os fragmentos processados pela aplicação de consumo da KCL
Tópicos
O que é uma tabela de concessões
Em cada aplicação do Amazon Kinesis Data Streams, a KCL usa uma tabela de concessões exclusiva (armazenada em uma tabela do Amazon DynamoDB) para monitorar os fragmentos em um fluxo de dados do KDS vinculados a uma concessão e sendo processados pelos operadores da aplicação de consumo da KCL.
Importante
Como a KCL usa o nome da aplicação de consumo para criar o nome da tabela de concessões que a aplicação usa, cada aplicação de consumo deve ter um nome exclusivo.
É possível visualizar a tabela usando o console do Amazon DynamoDB enquanto a aplicação de consumo está em execução.
Se a tabela de concessões da aplicação de consumo da KCL não existir quando a aplicação for inicializada, um dos operadores a criará.
Importante
Sua conta é cobrada pelos custos associados à tabela do DynamoDB, além dos custos associados ao próprio Kinesis Data Streams.
Cada linha na tabela de concessões representa um fragmento que está sendo processado pelos operadores da aplicação de consumo. Se a aplicação de consumo da KCL processar somente um fluxo de dados, a chave de hash da tabela de concessões, leaseKey
, será o ID do fragmento. Em caso de Processar vários fluxos de dados com a mesma aplicação de consumo da KCL 2.x para Java, a estrutura da leaseKey será semelhante a account-id:StreamName:streamCreationTimestamp:ShardId
. Por exemplo, 111111111:multiStreamTest-1:12345:shardId-000000000336
.
Além do ID do fragmento, cada linha também inclui os seguintes dados:
-
checkpoint: número de sequência do ponto de verificação mais recente do fragmento. Esse valor é exclusivo entre todos os fragmentos no fluxo de dados.
-
checkpointSubSequenceNúmero: ao usar o recurso de agregação da Kinesis Producer Library, essa é uma extensão do ponto de verificação que rastreia registros individuais de usuários dentro do registro do Kinesis.
-
leaseCounter: usado para versionamento de concessão, para que os operadores possam detectar se a própria concessão foi assumida por outro operador.
-
leaseKey: um identificador exclusivo para uma concessão. Cada concessão é específica a um fragmento no fluxo de dados e é retida por um operador por vez.
-
leaseOwner: o operador que está retendo essa concessão.
-
ownerSwitchesSincePonto de verificação: Quantas vezes esse contrato mudou de trabalhadores desde a última vez que um posto de controle foi escrito.
-
parentShardId: usado para garantir que o fragmento principal seja totalmente processado antes do início do processamento nos fragmentos secundários. Isso garante que os registros sejam processados na mesma ordem em que foram colocados no fluxo.
-
hashrange: usado pelo
PeriodicShardSyncManager
para executar sincronizações periódicas a fim de encontrar fragmentos ausentes na tabela de concessões e criar concessões para eles, se necessário.nota
A partir da KCL 1.14 e da KCL 2.3, esse dado está presente na tabela de concessões de cada fragmento. Para obter mais informações sobre
PeriodicShardSyncManager
e a sincronização periódica entre concessões e fragmentos, consulte Como a tabela de concessões é sincronizada com fragmentos em um fluxo de dados do KDS. -
childshards: usado por
LeaseCleanupManager
para revisar o status de processamento do fragmento filho e decidir se o fragmento pai pode ser excluído da tabela de concessões.nota
A partir da KCL 1.14 e da KCL 2.3, esse dado está presente na tabela de concessões de cada fragmento.
-
shardID: o ID do fragmento.
nota
Esse dado só estará presente na tabela de concessões se você estiver Processar vários fluxos de dados com a mesma aplicação de consumo da KCL 2.x para Java. Isso só tem suporte na KCL 2.x para Java, a partir da KCL 2.3 para Java e versões posteriores.
-
stream name o identificador do fluxo de dados no formato
account-id:StreamName:streamCreationTimestamp
.nota
Esse dado só estará presente na tabela de concessões se você estiver Processar vários fluxos de dados com a mesma aplicação de consumo da KCL 2.x para Java. Isso só tem suporte na KCL 2.x para Java, a partir da KCL 2.3 para Java e versões posteriores.
Throughput
Se sua aplicação do Amazon Kinesis Data Streams receber exceções de throughput provisionada, é necessário aumentar a throughput provisionada para a tabela do DynamoDB. A KCL cria a tabela com uma throughput provisionada de 10 leituras por segundo e 10 gravações por segundo, mas isso pode não ser suficiente para a aplicação. Por exemplo, se uma aplicação do Amazon Kinesis Data Streams definir pontos de verificação ou usar operadores com frequência em um fluxo de dados composto por vários fragmentos, talvez seja necessário uma throughput maior.
Para obter informações sobre a throughput provisionada no DynamoDB, consulte Modo de capacidade de leitura/gravação e Trabalhar com tabelas e dados no DynamoDB no Guia do desenvolvedor do Amazon DynamoDB.
Como a tabela de concessões é sincronizada com fragmentos em um fluxo de dados do KDS
Os operadores das aplicações de consumo da KCL usam concessões para processar fragmentos de um determinado fluxo de dados. As informações sobre qual operador usa a concessão a um fragmento em um momento determinado são armazenadas em uma tabela de concessões. A tabela de concessões precisa permanecer sincronizada com as informações mais recentes do fragmento do fluxo de dados enquanto a aplicação de consumo da KCL está em execução. A KCL sincroniza a tabela de concessões com as informações de fragmentos adquiridas do serviço Kinesis Data Streams durante a inicialização ou o reinício da aplicação de consumo e sempre que um fragmento sendo processado chega ao fim (refragmentação). Em outras palavras, os operadores ou uma aplicação de consumo da KCL são sincronizados com o fluxo de dados que estão processando durante a inicialização da aplicação e sempre que a aplicação encontra um evento de refragmentação do fluxo de dados.
Tópicos
Sincronização na KCL 1.0 - 1.13 e na KCL 2.0 - 2.2
No KCL 1.0 - 1.13 e no KCL 2.0 - 2.2, durante a inicialização do aplicativo consumidor e também durante cada evento de refragmentação do fluxo de dados, o KCL sincroniza a tabela de leasing com as informações de fragmentos adquiridas do serviço Kinesis Data Streams invocando a ou a descoberta. ListShards
DescribeStream
APIs Em todas as versões da KCL listadas acima, cada operador de uma aplicação de consumo da KCL conclui as seguintes etapas para realizar a sincronização de concessão/fragmento durante a inicialização da aplicação e em cada evento de refragmentação do fluxo:
-
Busca todos os fragmentos de dados do fluxo sendo processado
-
Busca todas as concessões do fragmento da tabela de concessões
-
Filtra cada fragmento aberto sem uma concessão na tabela de concessões
-
Itera em todos os fragmentos abertos encontrados e, para cada fragmento aberto sem pai aberto:
-
Percorre a árvore hierárquica no caminho dos ancestrais para determinar se o fragmento é um descendente. Um fragmento será considerado descendente se um fragmento ancestral estiver sendo processado (a entrada de concessão do fragmento ancestral existe na tabela de concessões) ou se houver um fragmento ancestral que deve ser processado (por exemplo, a posição inicial é
TRIM_HORIZON
ouAT_TIMESTAMP
). -
Se o fragmento aberto for descendente, a KCL verificará sua posição inicial e criará concessões para seus pais, se necessário.
-
Sincronização na KCL 2.x a partir da KCL 2.3 e em versões posteriores
A partir das versões mais recentes compatíveis da KCL 2.x (KCL 2.3) e posteriores, a biblioteca oferece suporte às alterações no processo de sincronização listadas a seguir. Essas mudanças na sincronização de concessão/fragmento reduzem significativamente o número de chamadas de API feitas pelas aplicações de consumo da KCL ao serviço Kinesis Data Streams e otimizam o gerenciamento de concessões nessa aplicação.
-
Na inicialização da aplicação, se a tabela de concessões estiver vazia, a KCL utilizará a opção de filtragem da API
ListShard
(o parâmetro de solicitaçãoShardFilter
opcional) para recuperar e criar concessões somente para um instantâneo dos fragmentos abertos no momento especificado pelo parâmetroShardFilter
. O parâmetroShardFilter
permite filtrar a resposta da APIListShards
. A única propriedade obrigatória do parâmetroShardFilter
éType
. A KCL usa a propriedade de filtroType
e os seguintes valores válidos para identificar e retornar um instantâneo dos fragmentos abertos que podem exigir novas concessões:-
AT_TRIM_HORIZON
: a resposta inclui todos os fragmentos abertos emTRIM_HORIZON
. -
AT_LATEST
: a resposta inclui somente os fragmentos do fluxo de dados abertos no momento. -
AT_TIMESTAMP
: a resposta inclui todos os fragmentos com timestamp inicial menor ou igual ao timestamp fornecido e timestamp final maior ou igual ao timestamp fornecido ou ainda abertos.
ShardFilter
é usado ao criar uma concessão em uma tabela de concessões vazia para inicializar concessões para um instantâneo dos fragmentos especificados emRetrievalConfig#initialPositionInStreamExtended
.Para obter mais informações sobre o
ShardFilter
, consulte https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html. -
-
Em vez de todos os trabalhadores realizarem a lease/shard synchronization to keep the lease table up to date with the latest shards in the data stream, a single elected worker leader performs the lease/shard sincronização.
-
O KCL 2.3 usa o parâmetro de
ChildShards
retorno doGetRecords
e doSubscribeToShard
APIs para realizar a sincronização de arrendamento/fragmento que acontece emSHARD_END
para fragmentos fechados, permitindo que um trabalhador da KCL crie concessões somente para os fragmentos secundários do fragmento que ele concluiu o processamento. Em aplicações de consumo com throughput compartilhada, a otimização da sincronização de concessão/fragmento usa o parâmetroChildShards
da APIGetRecords
. Em aplicações de consumo com throughput dedicada (distribuição avançada), a otimização da sincronização de concessão/fragmento usa o parâmetroChildShards
da APISubscribeToShard
. Para ter mais informações, consulte GetRecords, SubscribeToShards e ChildShard. -
Com as mudanças acima, o comportamento da KCL está passando de um modelo no qual todos os operadores obtêm informações de todos os fragmentos existentes para um modelo em que os operadores só obtêm informações dos filhos do fragmento que possui. Portanto, além da sincronização que ocorre durante a inicialização da aplicação de consumo e nos eventos de refragmentação, a KCL agora também realiza varreduras periódicas adicionais de fragmento/concessão para identificar possíveis falhas na tabela de concessões (em outras palavras, para identificar todos os novos fragmentos), confirmando que o intervalo de hash completo do fluxo de dados está sendo processado e criar concessões para eles, se necessário.
PeriodicShardSyncManager
é o componente responsável pela execução periódica de varreduras de concessão/fragmento.Para obter mais informações sobre o
PeriodicShardSyncManager
KCL 2.3, consulte https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java #L201-L213. A KCL 2.3 tem novas opções disponíveis para configuração de
PeriodicShardSyncManager
emLeaseManagementConfig
:Name Valor padrão Descrição leasesRecoveryAuditorExecutionFrequencyMillis 120.000 (2 minutos)
Frequência (em milissegundos) do trabalho do auditor para verificar concessões parciais na tabela de concessões. Se detectar alguma falha nas concessões de um fluxo, o auditor acionará a sincronização de fragmentos com base em
leasesRecoveryAuditorInconsistencyConfidenceThreshold
.leasesRecoveryAuditorInconsistencyConfidenceThreshold 3
Limite de confiança no trabalho periódico do auditor para determinar se as concessões de um fluxo de dados na tabela de concessões são inconsistentes. Se encontrar consecutivamente o mesmo conjunto de inconsistências em um fluxo de dados pelo número de vezes definido, o auditor acionará uma sincronização de fragmentos.
Agora, novas CloudWatch métricas também são emitidas para monitorar a integridade do
PeriodicShardSyncManager
. Para obter mais informações, consulte PeriodicShardSyncManager. -
Inclui uma otimização de
HierarchicalShardSyncer
para criar apenas concessões em uma camada de fragmentos.
Sincronização na KCL 1.x a partir da KCL 1.14 e em versões posteriores
A partir das versões mais recentes compatíveis da KCL 1.x (KCL 1.14) e posteriores, a biblioteca oferece suporte às alterações no processo de sincronização listadas a seguir. Essas mudanças na sincronização de concessão/fragmento reduzem significativamente o número de chamadas de API feitas pelas aplicações de consumo da KCL ao serviço Kinesis Data Streams e otimizam o gerenciamento de concessões nessa aplicação.
-
Na inicialização da aplicação, se a tabela de concessões estiver vazia, a KCL utilizará a opção de filtragem da API
ListShard
(o parâmetro de solicitaçãoShardFilter
opcional) para recuperar e criar concessões somente para um instantâneo dos fragmentos abertos no momento especificado pelo parâmetroShardFilter
. O parâmetroShardFilter
permite filtrar a resposta da APIListShards
. A única propriedade obrigatória do parâmetroShardFilter
éType
. A KCL usa a propriedade de filtroType
e os seguintes valores válidos para identificar e retornar um instantâneo dos fragmentos abertos que podem exigir novas concessões:-
AT_TRIM_HORIZON
: a resposta inclui todos os fragmentos abertos emTRIM_HORIZON
. -
AT_LATEST
: a resposta inclui somente os fragmentos do fluxo de dados abertos no momento. -
AT_TIMESTAMP
: a resposta inclui todos os fragmentos com timestamp inicial menor ou igual ao timestamp fornecido e timestamp final maior ou igual ao timestamp fornecido ou ainda abertos.
ShardFilter
é usado ao criar uma concessão em uma tabela de concessões vazia para inicializar concessões para um instantâneo dos fragmentos especificados emKinesisClientLibConfiguration#initialPositionInStreamExtended
.Para obter mais informações sobre o
ShardFilter
, consulte https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html. -
-
Em vez de todos os trabalhadores realizarem a lease/shard synchronization to keep the lease table up to date with the latest shards in the data stream, a single elected worker leader performs the lease/shard sincronização.
-
O KCL 1.14 usa o parâmetro de
ChildShards
retorno doGetRecords
e doSubscribeToShard
APIs para realizar a sincronização de arrendamento/fragmento que acontece emSHARD_END
para fragmentos fechados, permitindo que um trabalhador da KCL crie concessões somente para os fragmentos secundários do fragmento que ele concluiu o processamento. Para ter mais informações, consulte GetRecords e ChildShard. -
Com as mudanças acima, o comportamento da KCL está passando de um modelo no qual todos os operadores obtêm informações de todos os fragmentos existentes para um modelo em que os operadores só obtêm informações dos filhos do fragmento que possui. Portanto, além da sincronização que ocorre durante a inicialização da aplicação de consumo e nos eventos de refragmentação, a KCL agora também realiza varreduras periódicas adicionais de fragmento/concessão para identificar possíveis falhas na tabela de concessões (em outras palavras, para identificar todos os novos fragmentos), confirmando que o intervalo de hash completo do fluxo de dados está sendo processado e criar concessões para eles, se necessário.
PeriodicShardSyncManager
é o componente responsável pela execução periódica de varreduras de concessão/fragmento.Quando
KinesisClientLibConfiguration#shardSyncStrategyType
é definido comoShardSyncStrategyType.SHARD_END
,PeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThreshold
é usado para determinar o limite do número de varreduras consecutivas contendo lacunas na tabela de concessões após o qual é necessário impor uma sincronização de fragmentos. QuandoKinesisClientLibConfiguration#shardSyncStrategyType
é definido comoShardSyncStrategyType.PERIODIC
,leasesRecoveryAuditorInconsistencyConfidenceThreshold
é ignorado.Para obter mais informações sobre o
PeriodicShardSyncManager
KCL 1.14, consulte https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L987 -L999. A KCL 1.14 tem uma nova opção disponível para configuração de
PeriodicShardSyncManager
emLeaseManagementConfig
:Name Valor padrão Descrição leasesRecoveryAuditorInconsistencyConfidenceThreshold 3
Limite de confiança no trabalho periódico do auditor para determinar se as concessões de um fluxo de dados na tabela de concessões são inconsistentes. Se encontrar consecutivamente o mesmo conjunto de inconsistências em um fluxo de dados pelo número de vezes definido, o auditor acionará uma sincronização de fragmentos.
Agora, novas CloudWatch métricas também são emitidas para monitorar a integridade do
PeriodicShardSyncManager
. Para obter mais informações, consulte PeriodicShardSyncManager. -
A KCL 1.14 agora também oferece suporte à limpeza adiada de concessões. As concessões são excluídas de forma assíncrona por
LeaseCleanupManager
ao chegar aoSHARD_END
quando um fragmento ultrapassar o período de retenção do fluxo de dados ou quando for fechado por uma operação de refragmentação.Novas opções disponíveis para configuração de
LeaseCleanupManager
:Name Valor padrão Descrição leaseCleanupIntervalMillis 1 minuto
Intervalo de execução do thread de limpeza de concessões.
completedLeaseCleanupIntervalMillis 5 minutos Intervalo de verificação de conclusão da concessão.
garbageLeaseCleanupIntervalMillis 30 minutos Intervalo de verificação do estado de lixo de uma concessão (ou seja, reduzida após o período de retenção do fluxo de dados).
-
Inclui uma otimização de
KinesisShardSyncer
para criar apenas concessões em uma camada de fragmentos.
Processar vários fluxos de dados com a mesma aplicação de consumo da KCL 2.x para Java
Esta seção descreve as seguintes alterações na KCL 2.x para Java que permitem criar aplicações de consumo da KCL que podem processar mais de um fluxo de dados ao mesmo tempo.
Importante
O processamento multifluxo só tem suporte na KCL 2.x para Java, a partir da KCL 2.3 para Java e versões posteriores.
O processamento multifluxo NÃO tem suporte em nenhuma outra linguagem na qual a KCL 2.x possa ser implementada.
O processamento multifluxo NÃO tem suporte em nenhuma versão da KCL 1.x.
-
MultistreamTracker interface
Para criar um aplicativo de consumidor que possa processar vários fluxos ao mesmo tempo, você deve implementar uma nova interface chamada MultistreamTracker
. Essa interface inclui o método streamConfigList
, que retorna a lista de fluxos de dados, e suas configurações, a serem processados pela aplicação de consumo da KCL. Observe que os fluxos de dados sendo processados podem ser alterados durante o runtime da aplicação de consumo.streamConfigList
é chamado periodicamente pela KCL para obter informações das mudanças nos fluxos de dados a serem processados.O
streamConfigList
método preenche a StreamConfiglista. package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }
Observe que os campos
StreamIdentifier
eInitialPositionInStreamExtended
são obrigatórios, enquantoconsumerArn
é opcional. Só é necessário fornecerconsumerArn
se a KCL 2.x estiver sendo usada para implementar uma aplicação de consumo de distribuição avançada.Para obter mais informações sobre
StreamIdentifier
, consulte https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java #L129. Para criar um StreamIdentifier
, recomenda-se a criação de uma instância multifluxo a partir dostreamArn
e dostreamCreationEpoch
que esteja disponível na v2.5.0 e versões posteriores. Na KCL v2.3 e v2.4, que não oferecem suporte aostreamArm
, crie uma instância multifluxo usando o formatoaccount-id:StreamName:streamCreationTimestamp
. Esse formato será descontinuado e não terá mais suporte a partir da próxima versão principal.MultistreamTracker
também inclui uma estratégia para excluir concessões de fluxos antigos na tabela de concessões (formerStreamsLeasesDeletionStrategy
). Observe que a estratégia NÃO PODE ser alterada durante o runtime da aplicação de consumo. Para obter mais informações, consulte https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b//amazon-kinesis-client.java src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy -
ConfigsBuilder
é uma classe de todo o aplicativo que você pode usar para especificar todas as configurações do KCL 2.x a serem usadas ao criar seu aplicativo consumidor KCL. ConfigsBuilder
a classe agora tem suporte para aMultistreamTracker
interface. Você pode inicializar ConfigsBuilder com o nome do único fluxo de dados do qual consumir registros:/** * Constructor to initialize ConfigsBuilder with StreamName * @param streamName * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.right(streamName); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
Ou você pode inicializar ConfigsBuilder com
MultiStreamTracker
se quiser implementar um aplicativo consumidor KCL que processe vários fluxos ao mesmo tempo.* Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
-
Com o suporte multifluxo implementado na aplicação de consumo da KCL, cada linha da tabela de concessões da aplicação contém o ID do fragmento e o nome do fluxo que a aplicação processa.
-
Quando o suporte multifluxo para sua aplicação de consumo da KCL é implementado, leaseKey assume a estrutura
account-id:StreamName:streamCreationTimestamp:ShardId
. Por exemplo,111111111:multiStreamTest-1:12345:shardId-000000000336
.Importante
Quando sua aplicação de consumo da KCL está configurada para processar somente um fluxo de dados, leaseKey (a chave de hash da tabela de concessões) é o ID do fragmento. Ao reconfigurar a aplicação de consumo da KCL existente para processar vários fluxos de dados, a tabela de concessões será quebrada, pois no suporte multifluxo, a estrutura leaseKey deve ser a
account-id:StreamName:StreamCreationTimestamp:ShardId
.
Use o KCL com o Registro do AWS Glue Esquema
Você pode integrar seus streams de dados do Kinesis com o AWS Glue Schema Registry. O registro de esquemas do AWS Glue permite detectar, controlar e evoluir esquemas centralmente, ao mesmo tempo que garante que os dados produzidos sejam validados continuamente por um esquema registrado. O esquema define a estrutura e o formato de um registro de dados. Um esquema é uma especificação versionada para publicação, consumo ou datastore confiáveis. O AWS Glue Schema Registry permite que você end-to-end melhore a qualidade e a governança de dados em seus aplicativos de streaming. Para obter mais informações, consulte Registro de esquemas do AWS Glue. Uma das formas de configurar essa integração é usando a KCL em Java.
Importante
Atualmente, a integração do Kinesis Data AWS Glue Streams e do Schema Registry só é compatível com os streams de dados do Kinesis que usam consumidores do KCL 2.3 implementados em Java. Não há suporte para múltiplas linguagens. Os clientes da KCL 1.0 não são compatíveis. Os clientes da KCL 2.x anteriores à KCL 2.3 não são compatíveis.
Para obter instruções detalhadas sobre como configurar a integração do Kinesis Data Streams com o Schema Registry usando o KCL, consulte a seção “Interagindo com dados usando as bibliotecas KPL/KCL” em Caso de uso: Integrando o Amazon Kinesis Data Streams com o Glue Schema Registry. AWS