Desenvolva consumidores personalizados com taxa de transferência compartilhada usando o AWS SDK for Java - Amazon Kinesis Data Streams

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Desenvolva consumidores personalizados com taxa de transferência compartilhada usando o AWS SDK for Java

Um dos métodos para desenvolver consumidores personalizados do Kinesis Data Streams com compartilhamento total é usar o Amazon Kinesis Data Streams. APIs Esta seção descreve o uso do Kinesis APIs Data Streams AWS SDK com o for Java. O código de amostra Java nesta seção demonstra como realizar KDS API operações básicas e é dividido logicamente por tipo de operação.

Esses exemplos não representam um código pronto para produção. Eles não verificam todas as exceções possíveis nem levam em conta todas as considerações de segurança ou desempenho possíveis.

Você pode chamar o Kinesis APIs Data Streams usando outras linguagens de programação diferentes. Para obter mais informações sobre todas as opções disponíveis AWS SDKs, consulte Comece a desenvolver com a Amazon Web Services.

Importante

O método recomendado para desenvolver consumidores personalizados do Kinesis Data Streams com conteúdo compartilhado é usar a Kinesis Client Library (). KCL KCLajuda você a consumir e processar dados de um stream de dados do Kinesis ao cuidar de muitas das tarefas complexas associadas à computação distribuída. Para obter mais informações, consulte Desenvolvimento de consumidores personalizados com o uso KCL de taxa de transferência compartilhada.

Obter dados de um stream

Os Kinesis APIs Data Streams getShardIterator incluem getRecords os métodos e que você pode invocar para recuperar registros de um stream de dados. Esse é o modelo de pull, em que o código extrai registros de dados diretamente dos fragmentos do fluxo de dados.

Importante

Recomendamos que você use o suporte do processador de registros fornecido pela KCL para recuperar registros de seus fluxos de dados. Esse é o modelo push, em que você implementa o código que processa os dados. O KCL recupera registros de dados do fluxo de dados e os entrega ao código do seu aplicativo. Além disso, KCL fornece funcionalidade de failover, recuperação e balanceamento de carga. Para obter mais informações, consulte Desenvolvimento de consumidores personalizados com o uso KCL de taxa de transferência compartilhada.

No entanto, em alguns casos, talvez você prefira usar o Kinesis APIs Data Streams. Um exemplo disso é a implementação de ferramentas personalizadas de monitoramento ou depuração dos fluxos de dados.

Importante

O Kinesis Data Streams oferece suporte a alterações do período de retenção do registro de dados no fluxo de dados. Para obter mais informações, consulte Alterar o período de retenção de dados.

Use iteradores de fragmentos

Você recupera registros do stream por estilhaço. Para cada estilhaço e para cada lote de registros que recupera desse estilhaço, você precisa obter um iterador de estilhaços. O iterador de estilhaços é usado no objeto getRecordsRequest para especificar o estilhaço a partir do qual os registros devem ser recuperados. O tipo associado ao iterador de estilhaços determina o ponto no estilhaço a partir do qual os registros devem ser recuperados (veja mais à frente nesta seção para obter mais detalhes). Antes de trabalhar com o iterador de fragmento, você deve recuperar o fragmento. Para obter mais informações, consulte Listar fragmentos.

Obtenha o iterador de estilhaços inicial usando o método getShardIterator. Obtenha iteradores de estilhaços para obter mais lotes de registros usando o método getNextShardIterator do objetogetRecordsResult retornado pelo método getRecords. Um iterador de estilhaços é válido por 5 minutos. Se usar um iterador de estilhaços enquanto ele for válido, você receberá um novo. Cada iterador de estilhaços permanecerá válido por 5 minutos, mesmo depois de ser usado.

Para obter o iterador de estilhaços inicial, instancie GetShardIteratorRequest e passe-o ao método getShardIterator. Para configurar a solicitação, especifique o stream e o ID do estilhaço. Para obter informações sobre como obter os streams em sua AWS conta, consulteListar streams. Para obter informações sobre como obter os estilhaços em um stream, consulte Listar fragmentos.

String shardIterator; GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(myStreamName); getShardIteratorRequest.setShardId(shard.getShardId()); getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest); shardIterator = getShardIteratorResult.getShardIterator();

Esse código de exemplo especifica TRIM_HORIZON como o tipo de iterador ao obter o iterador de estilhaços inicial. Esse tipo de iterador significa que o retorno dos registros deve começar a partir do primeiro registro adicionado ao fragmento, em vez de começar pelo registro adicionado mais recentemente, também conhecido como extremidade. Estes são tipos de iterador possíveis:

  • AT_SEQUENCE_NUMBER

  • AFTER_SEQUENCE_NUMBER

  • AT_TIMESTAMP

  • TRIM_HORIZON

  • LATEST

Para obter mais informações, consulte ShardIteratorType.

Alguns tipos de iterador exigem que você especifique um número de sequência, além do tipo. Por exemplo:

getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER"); getShardIteratorRequest.setStartingSequenceNumber(specialSequenceNumber);

Depois de obter um registro usando getRecords, você pode conseguir o número de sequência do registro chamando o método getSequenceNumber do registro.

record.getSequenceNumber()

Além disso, o código que adiciona registros ao stream de dados pode obter o número de sequência de um registro adicional chamando getSequenceNumber no resultado de putRecord.

lastSequenceNumber = putRecordResult.getSequenceNumber();

Você pode usar números de sequência para garantir estritamente o ordenamento crescente do registros. Para obter mais informações, consulte o exemplo de código em PutRecordexemplo.

Use GetRecords

Depois que você obtiver o iterador de estilhaços, instancie um objeto GetRecordsRequest. Especifique o iterador para a solicitação usando o método setShardIterator.

Opcionalmente, você também pode definir o número de registros a recuperar usando o método setLimit. O número de registros retornados por getRecords sempre é igual ou menor que esse limite. Se você não especificar esse limite, getRecords retornará 10 MB de registros recuperados. O código de exemplo a seguir define esse limite para 25 registros.

Se nenhum registro for retornado, isso significa que não há registros de dados disponíveis atualmente nesse estilhaço no número de sequência referenciado pelo iterador de estilhaços. Nessa situação, a aplicação deve aguardar o tempo adequado de acordo com as fontes de dados do fluxo. Em seguida, tente obter os dados do estilhaço novamente usando o iterador de estilhaços retornado pela chamada anterior a getRecords.

Passe o getRecordsRequest para o método getRecords e capture o valor retornado como um objeto getRecordsResult. Para obter os registros de dados, chame o método getRecords no objeto getRecordsResult.

GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest); List<Record> records = getRecordsResult.getRecords();

Para preparar-se para outra chamada para getRecords, obtenha o próximo iterador de estilhaços de getRecordsResult.

shardIterator = getRecordsResult.getNextShardIterator();

Para obter os melhores resultados, aguarde pelo menos 1 segundo (1.000 milissegundos) entre as chamadas para getRecords a fim de evitar exceder o limite na frequência de getRecords.

try { Thread.sleep(1000); } catch (InterruptedException e) {}

Normalmente, você deve chamar getRecords em um loop, mesmo se estiver recuperando um único registro em um cenário de teste. Uma única chamada para getRecords pode retornar uma lista de registros vazia, mesmo quando o estilhaço contém mais registros em números de sequência subsequentes. Quando isso ocorre, o NextShardIterator retornado com a lista de registros vazia faz referência a um número de sequência subsequente no estilhaço, e as chamadas posteriores a getRecords acabam retornando os registros. O exemplo a seguir demonstra o uso de um loop.

Exemplo: getRecords

O código de exemplo a seguir reflete as dicas de getRecords nesta seção, inclusive chamadas em um loop.

// Continuously read data records from a shard List<Record> records; while (true) { // Create a new getRecordsRequest with an existing shardIterator // Set the maximum records to return to 25 GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult result = client.getRecords(getRecordsRequest); // Put the result into record list. The result can be empty. records = result.getRecords(); try { Thread.sleep(1000); } catch (InterruptedException exception) { throw new RuntimeException(exception); } shardIterator = result.getNextShardIterator(); }

Se você estiver usando a Kinesis Client Library, ela poderá fazer várias chamadas antes de retornar dados. Esse comportamento é intencional e não indica um problema com os dados KCL ou com seus dados.

Adapte-se a uma nova fragmentação

getRecordsResult.getNextShardIterator retorna null para indicar que o fragmento passou por uma divisão ou uma mesclagem. O fragmento agora está em estado de CLOSED, e você leu todos os registros de dados disponíveis nele.

Nesse cenário, você pode usar getRecordsResult.childShards para conhecer os fragmentos filho que foram criados pela divisão ou mesclagem do fragmento sendo processado. Para obter mais informações, consulte ChildShard.

No caso de uma divisão, os dois novos estilhaços têm parentShardId igual ao ID do estilhaço que você estava processando anteriormente. O valor de adjacentParentShardId dos dois estilhaços é null.

No caso de uma fusão, o único estilhaço novo criado tem parentShardId igual ao ID de um dos estilhaços pai e adjacentParentShardId igual ao ID do outro estilhaço. Seu aplicativo já leu todos os dados de um desses estilhaços. Esse é o estilhaço para o qual getRecordsResult.getNextShardIterator retornou null. Se a ordem dos dados for importante para o aplicativo, você deverá garantir que ele também leia todos os dados de outro estilhaço pai antes de ler qualquer dado novo de estilhaço filho criado pela fusão.

Se estiver usando vários processadores para recuperar dados do streaming (digamos, um processador por estilhaço) e ocorrer uma divisão ou fusão de estilhaços, você deverá aumentar ou diminuir o número de processadores para se adaptar à alteração no número de estilhaços.

Para obter mais informações sobre a refragmentação, incluindo uma discussão sobre estados de estilhaços (como CLOSED), consulte Refragmentar um stream.