

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Desenvolva consumidores com o AWS SDK para Java
<a name="develop-consumers-sdk"></a>

 Você pode desenvolver consumidores personalizados usando o Amazon Kinesis APIs Data Streams. Esta seção descreve o uso do Kinesis APIs Data Streams AWS SDK para Java com o.

**Importante**  
O método recomendado para desenvolvimento de consumidores personalizados do Kinesis Data Streams com throughput compartilhada envolve o uso da Kinesis Client Library (KCL). A KCL ajuda a consumir e processar dados de um fluxo de dados do Kinesis lidando com muitas das tarefas complexas associadas à computação distribuída. Para obter mais informações, consulte [Desenvolver consumidores com a KCL em Java](develop-kcl-consumers-java.md).

**Topics**
+ [Desenvolva consumidores com produtividade compartilhada com o AWS SDK para Java](developing-consumers-with-sdk.md)
+ [Desenvolva consumidores expandidos aprimorados com o AWS SDK para Java](building-enhanced-consumers-api.md)
+ [Interaja com os dados usando o AWS Glue Schema Registry](building-enhanced-consumers-glue-schema-registry.md)

# Desenvolva consumidores com produtividade compartilhada com o AWS SDK para Java
<a name="developing-consumers-with-sdk"></a>

Um dos métodos para desenvolver consumidores personalizados do Kinesis Data Streams com compartilhamento total é usar o Amazon APIs Kinesis Data Streams com o. AWS SDK para Java Esta seção descreve o uso do Kinesis APIs Data Streams AWS SDK para Java com o. Você pode chamar o Kinesis APIs Data Streams usando outras linguagens de programação diferentes. Para obter mais informações sobre todas as opções disponíveis AWS SDKs, consulte [Comece a desenvolver com a Amazon Web Services](https://aws.amazon.com/developers/getting-started/). 

O código de exemplo Java nesta seção demonstra como executar operações básicas da API do Kinesis Data Streams e está dividido logicamente por tipo de operação. Esses exemplos não representam um código pronto para produção. Eles não verificam todas as exceções possíveis nem levam em conta todas as considerações de segurança ou desempenho possíveis. 

**Topics**
+ [Como obter dados de um fluxo](#kinesis-using-sdk-java-get-data)
+ [Como usar iteradores de fragmentos](#kinesis-using-sdk-java-get-data-shard-iterators)
+ [Use GetRecords](#kinesis-using-sdk-java-get-data-getrecords)
+ [Adaptar para uma nova fragmentação](#kinesis-using-sdk-java-get-data-reshard)

## Como obter dados de um fluxo
<a name="kinesis-using-sdk-java-get-data"></a>

Os Kinesis APIs Data Streams `getShardIterator` incluem `getRecords` os métodos e que você pode invocar para recuperar registros de um stream de dados. Esse é o modelo de pull, em que o código extrai registros de dados diretamente dos fragmentos do fluxo de dados.

**Importante**  
Recomenda-se usar o suporte do processador de registros fornecido pela KCL para recuperar registros dos fluxos de dados. Esse é o modelo push, no qual é implementado o código que processa os dados. A KCL recupera registros de dados do fluxo de dados e os entrega ao código da aplicação. Além disso, a KCL fornece as funcionalidades de failover, recuperação e balanceamento de carga. Para obter mais informações, consulte [Desenvolver consumidores personalizados com throughput compartilhada usando a KCL](https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html).

No entanto, em alguns casos, talvez você prefira usar o Kinesis APIs Data Streams. Um exemplo disso é a implementação de ferramentas personalizadas de monitoramento ou depuração dos fluxos de dados.

**Importante**  
O Kinesis Data Streams oferece suporte a alterações do período de retenção do registro de dados no fluxo de dados. Para obter mais informações, consulte [Alterar o período de retenção de dados](kinesis-extended-retention.md).

## Como usar iteradores de fragmentos
<a name="kinesis-using-sdk-java-get-data-shard-iterators"></a>

Os registros do fluxo são recuperados por fragmentos. Para cada fragmento e para cada lote de registros que recupera desse fragmento, é necessário obter um *iterador de fragmentos*. O iterador de fragmentos é usado no objeto `getRecordsRequest` para especificar o fragmento a partir do qual os registros devem ser recuperados. O tipo associado ao iterador de fragmentos determina o ponto no fragmento a partir do qual os registros devem ser recuperados (veja mais à frente nesta seção para obter mais detalhes). Para trabalhar com o iterador de fragmentos, é necessário recuperar o fragmento. Para obter mais informações, consulte [Listar fragmentos](kinesis-using-sdk-java-list-shards.md).

Obtenha o iterador de fragmentos inicial usando o método `getShardIterator`. Obtenha iteradores de fragmentos para obter mais lotes de registros usando o método `getNextShardIterator` do objeto`getRecordsResult` retornado pelo método `getRecords`. Um iterador de fragmentos é válido por 5 minutos. Se um iterador de fragmentos for usado durante sua validade, um novo será fornecido. Cada iterador de fragmentos permanecerá válido por 5 minutos, mesmo depois de ser usado.

Para obter o iterador de fragmentos inicial, instancie `GetShardIteratorRequest` e passe-o ao método `getShardIterator`. Para configurar a solicitação, especifique o fluxo e o ID do fragmento. Para obter informações sobre como obter os streams em sua AWS conta, consulte[Listar fluxos](kinesis-using-sdk-java-list-streams.md). Para obter informações sobre como obter os fragmentos em um fluxo, consulte [Listar fragmentos](kinesis-using-sdk-java-list-shards.md).

```
String shardIterator;
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
getShardIteratorRequest.setStreamName(myStreamName);
getShardIteratorRequest.setShardId(shard.getShardId());
getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON");

GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest);
shardIterator = getShardIteratorResult.getShardIterator();
```

Esse código de exemplo especifica `TRIM_HORIZON` como o tipo de iterador ao obter o iterador de fragmentos inicial. Esse tipo de iterador significa que o retorno dos registros deve começar a partir do primeiro registro adicionado ao fragmento, em vez de começar pelo registro adicionado mais recentemente, também conhecido como *extremidade*. Estes são tipos de iterador possíveis:
+ `AT_SEQUENCE_NUMBER`
+ `AFTER_SEQUENCE_NUMBER`
+ `AT_TIMESTAMP`
+ `TRIM_HORIZON`
+ `LATEST`

Para obter mais informações, consulte [ShardIteratorType](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#Kinesis-GetShardIterator-request-ShardIteratorType).

Alguns tipos de iterador exigem que um número de sequência seja especificado, além do tipo. Por exemplo:

```
getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER");
getShardIteratorRequest.setStartingSequenceNumber(specialSequenceNumber);
```

Depois de obter um registro usando `getRecords`, pode-se conseguir o número de sequência do registro chamando o método `getSequenceNumber` do registro. 

```
record.getSequenceNumber()
```

Além disso, o código que adiciona registros ao fluxo de dados pode obter o número de sequência de um registro adicional chamando `getSequenceNumber` no resultado de `putRecord`. 

```
lastSequenceNumber = putRecordResult.getSequenceNumber();
```

É possível usar números sequenciais para garantir estritamente o ordenamento crescente do registros. Para obter mais informações, consulte o exemplo de código em [PutRecord exemplo](developing-producers-with-sdk.md#kinesis-using-sdk-java-putrecord-example).

## Use GetRecords
<a name="kinesis-using-sdk-java-get-data-getrecords"></a>

Após obter o iterador de fragmentos, instancie um objeto `GetRecordsRequest`. Especifique o iterador para a solicitação usando o método `setShardIterator`. 

Opcionalmente, também é possível definir o número de registros a recuperar usando o método `setLimit`. O número de registros retornados por `getRecords` sempre é igual ou menor que esse limite. Se esse limite não for especificado, `getRecords` retornará 10 MB de registros recuperados. O código de exemplo a seguir define esse limite para 25 registros.

Se nenhum registro for retornado, isso significa que não há registros de dados disponíveis atualmente nesse fragmento no número de sequência referenciado pelo iterador de fragmentos. Nessa situação, a aplicação deve aguardar o tempo adequado de acordo com as fontes de dados do fluxo. Em seguida, tente obter os dados do fragmento novamente usando o iterador de fragmentos retornado pela chamada anterior a `getRecords`. 

Passe o `getRecordsRequest` para o método `getRecords` e capture o valor retornado como um objeto `getRecordsResult`. Para obter os registros de dados, chame o método `getRecords` no objeto `getRecordsResult`. 

```
GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(25);

GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest);
List<Record> records = getRecordsResult.getRecords();
```

Para preparar-se para outra chamada para `getRecords`, obtenha o próximo iterador de fragmentos de `getRecordsResult`. 

```
shardIterator = getRecordsResult.getNextShardIterator();
```

Para obter os melhores resultados, aguarde pelo menos 1 segundo (1.000 milissegundos) entre as chamadas para `getRecords` a fim de evitar exceder o limite na frequência de `getRecords`. 

```
try {
  Thread.sleep(1000);
}
catch (InterruptedException e) {}
```

Normalmente, deve-se chamar `getRecords` em um loop, mesmo se estiver recuperando um único registro em um cenário de teste. Uma única chamada para `getRecords` pode retornar uma lista de registros vazia, mesmo quando o fragmento contém mais registros em números de sequência subsequentes. Quando isso ocorre, o `NextShardIterator` retornado com a lista de registros vazia faz referência a um número de sequência subsequente no fragmento, e as chamadas posteriores a `getRecords` acabam retornando os registros. O exemplo a seguir demonstra o uso de um loop.

**Exemplo: getRecords**  
O código de exemplo a seguir reflete as dicas de `getRecords` nesta seção, inclusive chamadas em um loop.

```
// Continuously read data records from a shard
List<Record> records;
    
while (true) {
   
  // Create a new getRecordsRequest with an existing shardIterator 
  // Set the maximum records to return to 25
  
  GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
  getRecordsRequest.setShardIterator(shardIterator);
  getRecordsRequest.setLimit(25); 

  GetRecordsResult result = client.getRecords(getRecordsRequest);
  
  // Put the result into record list. The result can be empty.
  records = result.getRecords();
  
  try {
    Thread.sleep(1000);
  } 
  catch (InterruptedException exception) {
    throw new RuntimeException(exception);
  }
  
  shardIterator = result.getNextShardIterator();
}
```

Se a Kinesis Client Library estiver sendo usada, ela poderá fazer várias chamadas antes de retornar dados. Esse comportamento é projetado e não indica um problema com a KCL ou com seus dados.

## Adaptar para uma nova fragmentação
<a name="kinesis-using-sdk-java-get-data-reshard"></a>

 `getRecordsResult.getNextShardIterator` retorna `null` para indicar que o fragmento passou por uma divisão ou uma mesclagem. O fragmento agora está em estado de `CLOSED`, e todos os registros de dados disponíveis nele foram lidos. 

 Nesse cenário, pode-se usar `getRecordsResult.childShards` para conhecer os fragmentos filho que foram criados pela divisão ou mesclagem do fragmento sendo processado. Para obter mais informações, consulte [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html).

 No caso de uma divisão, os dois novos fragmentos têm `parentShardId` igual ao ID do fragmento que estavam sendo processados anteriormente. O valor de `adjacentParentShardId` dos dois fragmentos é `null`. 

 No caso de uma fusão, o fragmento unificado criado terá `parentShardId` igual ao ID de um dos fragmentos pai e `adjacentParentShardId` igual ao ID do outro fragmento. Seu aplicativo já leu todos os dados de um desses fragmentos. Esse é o fragmento para o qual `getRecordsResult.getNextShardIterator` retornou `null`. Se a ordem dos dados for importante para o aplicativo, deve-se garantir que ele também leia todos os dados de outro fragmento pai antes de ler qualquer dado novo de fragmento filho criado pela fusão. 

 Se vários processadores forem usados para recuperar dados do fluxo (digamos, um processador por fragmento) e ocorrer uma divisão ou fusão de fragmentos, deve-se aumentar ou diminuir o número de processadores para se adaptar à alteração no número de fragmentos. 

 Para obter mais informações sobre a refragmentação, incluindo uma discussão sobre estados de fragmentos (como `CLOSED`), consulte [Refragmentar um fluxo](kinesis-using-sdk-java-resharding.md). 

# Desenvolva consumidores expandidos aprimorados com o AWS SDK para Java
<a name="building-enhanced-consumers-api"></a>

A *distribuição avançada* é um recurso do Amazon Kinesis Data Streams que permite que os consumidores recebam registros de um fluxo de dados com throughput dedicada de até 2 MB de dados por segundo por fragmento. Um consumidor que usa distribuição avançada não precisa lidar com outros consumidores que estejam recebendo dados do fluxo. Para obter mais informações, consulte [Desenvolver consumidores de distribuição avançada com throughput dedicado](enhanced-consumers.md).

É possível usar operações de API para criar um consumidor que usa a distribuição avançada no Kinesis Data Streams.

**Para registrar um consumidor com distribuição avançada usando a API do Kinesis Data Streams**

1. Ligue [RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html)para registrar seu aplicativo como um consumidor que usa fan-out aprimorado. O Kinesis Data Streams gera um nome do recurso da Amazon (ARN) para o consumidor e o retorna na resposta.

1. Para começar a ouvir um fragmento específico, passe o ARN do consumidor em uma chamada para. [SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html) Em seguida, o Kinesis Data Streams começa a enviar os registros desse fragmento para você, na forma de eventos [SubscribeToShardEvent](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShardEvent.html)do tipo em uma conexão HTTP/2. A conexão permanece aberta por até 5 minutos. Ligue [SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html)novamente se quiser continuar recebendo registros do fragmento após o `future` retorno da chamada para ser [SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html)concluído normalmente ou excepcionalmente.
**nota**  
`SubscribeToShard` A API também retorna a lista dos fragmentos filho do fragmento atual quando chega ao final do fragmento. 

1. Para cancelar o registro de um consumidor que está usando o fan-out aprimorado, ligue. [DeregisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeregisterStreamConsumer.html)

O de código a seguir é um exemplo de como é possível inscrever o consumidor em um fragmento, renovar a assinatura periodicamente e manipular os eventos.

```
    import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
    import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
     
    import java.util.concurrent.CompletableFuture;
     
    /**
     * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java
     * for complete code and more examples.
     */
    public class SubscribeToShardSimpleImpl {
     
        private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737";
        private static final String SHARD_ID = "shardId-000000000000";
     
        public static void main(String[] args) {
     
            KinesisAsyncClient client = KinesisAsyncClient.create();
     
            SubscribeToShardRequest request = SubscribeToShardRequest.builder()
                    .consumerARN(CONSUMER_ARN)
                    .shardId(SHARD_ID)
                    .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build();
     
            // Call SubscribeToShard iteratively to renew the subscription periodically.
            while(true) {
                // Wait for the CompletableFuture to complete normally or exceptionally.
                callSubscribeToShardWithVisitor(client, request).join();
            }
     
            // Close the connection before exiting.
            // client.close();
        }
     
     
        /**
         * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface.
         */
        private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) {
            SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() {
                @Override
                public void visit(SubscribeToShardEvent event) {
                    System.out.println("Received subscribe to shard event " + event);
                }
            };
            SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
                    .builder()
                    .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
                    .subscriber(visitor)
                    .build();
            return client.subscribeToShard(request, responseHandler);
        }
    }
```

 `event.ContinuationSequenceNumber` retorna `null` para indicar que o fragmento passou por uma divisão ou uma mesclagem. O fragmento agora está em estado de `CLOSED`, todos os registros de dados disponíveis nele foram lidos. Nesse cenário, como mostrado no exemplo acima, é possível usar `event.childShards` para conhecer os fragmentos filho que foram criados pela divisão ou mesclagem do fragmento sendo processado. Para obter mais informações, consulte [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html).

# Interaja com os dados usando o AWS Glue Schema Registry
<a name="building-enhanced-consumers-glue-schema-registry"></a>

Você pode integrar seus streams de dados do Kinesis com o AWS Glue Schema Registry. O registro de esquemas do AWS Glue permite detectar, controlar e evoluir esquemas centralmente, ao mesmo tempo que garante que os dados produzidos sejam validados continuamente por um esquema registrado. O esquema define a estrutura e o formato de um registro de dados. Um esquema é uma especificação versionada para publicação, consumo ou datastore confiáveis. O AWS Glue Schema Registry permite que você melhore a qualidade end-to-end dos dados e a governança de dados em seus aplicativos de streaming. Para obter mais informações, consulte [Registro de esquemas do AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). Uma das formas de configurar essa integração é por meio da API `GetRecords` Kinesis Data Streams, disponível AWS no SDK Java. 

Para obter instruções detalhadas sobre como configurar a integração do Kinesis Data Streams com o Schema Registry `GetRecords` usando o Kinesis Data Streams, consulte a seção “Interagindo com dados usando o APIs Kinesis Data Streams” [em Caso de uso: Integração](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds) do Amazon Kinesis Data APIs Streams com o Glue Schema Registry. AWS 