

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Desenvolva produtores usando a API Amazon Kinesis Data Streams com o AWS SDK para Java
<a name="developing-producers-with-sdk"></a>

Você pode desenvolver produtores usando a API Amazon Kinesis Data Streams AWS com o SDK for Java. Se você nunca usou o Kinesis Data Streams, comece familiarizando-se com os conceitos e a terminologia apresentados em [O que é o Amazon Kinesis Data Streams?](introduction.md) e [Use o AWS CLI para realizar operações do Amazon Kinesis Data Streams](getting-started.md).

Esses exemplos discutem a [API do Kinesis Data Streams](https://docs.aws.amazon.com/kinesis/latest/APIReference/) e usam o [AWS SDK para Java](https://aws.amazon.com/sdk-for-java/) para adicionar (colocar) dados a um fluxo. Contudo, na maioria dos casos de uso, é melhor usar a biblioteca KPL do Kinesis Data Streams. Para obter mais informações, consulte [Desenvolver produtores usando a Amazon Kinesis Producer Library (KPL)](developing-producers-with-kpl.md).

O código Java de exemplo neste capítulo demonstra como executar operações básicas da API do Kinesis Data Streams e está dividido logicamente por tipo de operação. Esses exemplos não representam um código pronto para produção, pois não verificam todas as exceções possíveis nem abrangem todas as considerações de segurança ou de performance possíveis. Também é possível chamar a [API do Kinesis Data Streams](https://docs.aws.amazon.com/kinesis/latest/APIReference/) usando outras linguagens de programação. Para obter mais informações sobre todas as opções disponíveis AWS SDKs, consulte [Comece a desenvolver com a Amazon Web Services](https://aws.amazon.com/developers/getting-started/).

Cada tarefa tem pré-requisitos. Por exemplo, não é possível adicionar dados a um fluxo enquanto o fluxo não é criado, o que requer a criação de um cliente. Para obter mais informações, consulte [Criar e gerenciar fluxos de dados do Kinesis](working-with-streams.md).

**Topics**
+ [Adicionar dados a um fluxo](#kinesis-using-sdk-java-add-data-to-stream)
+ [Interaja com os dados usando o AWS Glue Schema Registry](kinesis-integration-glue-schema-registry.md)

## Adicionar dados a um fluxo
<a name="kinesis-using-sdk-java-add-data-to-stream"></a>

Quando um fluxo é criado, pode-se adicionar dados a ele na forma de registros. Um registro é uma estrutura de dados que contém os dados a serem processados na forma de um blob de dados. Depois de armazenar os dados no registro, o Kinesis Data Streams não inspeciona, interpreta nem altera dados de forma alguma. Cada registro também tem um número sequencial e uma chave de partição associados.

Há duas operações diferentes na API do Kinesis Data Streams que adicionam dados a um fluxo, [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) e [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html). A operação `PutRecords` envia vários registros ao fluxo por solicitação HTTP e a operação singular `PutRecord` envia registros ao fluxo um por vez (uma solicitação HTTP separada é necessária para cada registro). Talvez convenha usar `PutRecords` para a maioria dos aplicativos, pois ele atingirá um throughput mais alto por produtor de dados. Para obter mais informações sobre cada uma dessas operações, consulte as subseções abaixo.

**Topics**
+ [Adicione vários registros com PutRecords](#kinesis-using-sdk-java-putrecords)
+ [Adicione um único registro com PutRecord](#kinesis-using-sdk-java-putrecord)

Sempre tenha em mente que, como a aplicação de origem está adicionando dados ao fluxo usando a API do Kinesis Data Streams, provavelmente há uma ou mais aplicações de consumo processando dados fora do fluxo simultaneamente. Para obter informações sobre como os consumidores obtêm dados usando a API do Kinesis Data Streams, consulte [Como obter dados de um fluxo](developing-consumers-with-sdk.md#kinesis-using-sdk-java-get-data).

**Importante**  
[Alterar o período de retenção de dados](kinesis-extended-retention.md)

### Adicione vários registros com PutRecords
<a name="kinesis-using-sdk-java-putrecords"></a>

A operação [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) envia vários registros ao Kinesis Data Streams em uma única solicitação. Ao usar `PutRecords`, os produtores podem obter um throughput mais alto ao enviar dados para o fluxo de dados do Kinesis. Cada solicitação `PutRecords` pode oferecer suporte a até 500 registros. Cada registro na solicitação pode ter no máximo 1 MB, até um limite de 5 MB para toda a solicitação, incluindo chaves de partição. Assim como a operação única `PutRecord` descrita abaixo, `PutRecords` usa números de sequência e chaves de partição. No entanto, o parâmetro `PutRecord` de `SequenceNumberForOrdering` não é incluído em uma chamada a `PutRecords`. A operação `PutRecords` tenta processar todos os registros na ordem natural da solicitação. 

Cada registro de dados tem um número sequencial exclusivo. O número de sequência é atribuído pelo Kinesis Data Streams depois que `client.putRecords` é chamada para adicionar os registros de dados ao fluxo. Os números sequenciais da mesma chave de partição geralmente aumentam com o tempo: quanto maior o período entre as solicitações `PutRecords`, maiores ficam os números sequenciais.

**nota**  
Os números de sequência não podem ser usados como índices para conjuntos de dados dentro do mesmo fluxo. Para separar logicamente conjuntos de dados, use chaves de partição ou crie um fluxo separado para cada conjunto de dados.

Uma solicitação `PutRecords` pode incluir registros com diferentes chaves de partição. O escopo da solicitação é um fluxo; cada solicitação pode incluir qualquer combinação de chaves de partição e registros, dentro dos limites da solicitação. As solicitações feitas com diferentes chaves de partição a streams com muitos fragmentos diferentes costumam ser mais rápidas do que as solicitações com um pequeno número de chaves de partição para um pequeno número de fragmentos. O número de chaves de partição deve ser muito maior do que o número de fragmentos para reduzir a latência e maximizar o throughput.

#### PutRecords exemplo
<a name="kinesis-using-sdk-java-putrecords-example"></a>

O código a seguir cria 100 registros de dados com chaves de partição sequenciais e os coloca em um fluxo denominado `DataStream`. 

```
        AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();
        
        clientBuilder.setRegion(regionName);
        clientBuilder.setCredentials(credentialsProvider);
        clientBuilder.setClientConfiguration(config);
        
        AmazonKinesis kinesisClient = clientBuilder.build();
 
        PutRecordsRequest putRecordsRequest  = new PutRecordsRequest();
        putRecordsRequest.setStreamName(streamName);
        List <PutRecordsRequestEntry> putRecordsRequestEntryList  = new ArrayList<>(); 
        for (int i = 0; i < 100; i++) {
            PutRecordsRequestEntry putRecordsRequestEntry  = new PutRecordsRequestEntry();
            putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes()));
            putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i));
            putRecordsRequestEntryList.add(putRecordsRequestEntry); 
        }

        putRecordsRequest.setRecords(putRecordsRequestEntryList);
        PutRecordsResult putRecordsResult  = kinesisClient.putRecords(putRecordsRequest);
        System.out.println("Put Result" + putRecordsResult);
```

A resposta a `PutRecords` inclui uma matriz de resposta `Records`. Cada registro na matriz de resposta se correlaciona diretamente com um registro na matriz de solicitação por ordenação natural, do início ao fim da solicitação e da resposta. A matriz de resposta de `Records` sempre inclui o mesmo número de registros da matriz de solicitação.

#### Lidar com falhas ao usar PutRecords
<a name="kinesis-using-sdk-java-putrecords-handling-failures"></a>

Por padrão, a falha de registros individuais em uma solicitação não interrompe o processamento de registros subsequentes em uma solicitação `PutRecords`. Isso significa que uma matriz de resposta `Records` inclui os registros com processamento bem-sucedido e malsucedido. É preciso detectar os registros com processamento malsucedido e incluí-los em uma chamada subsequente. 

Os registros bem-sucedidos incluem os valores `SequenceNumber` e `ShardID`, e os registros malsucedidos incluem os valores `ErrorCode` e `ErrorMessage`. O parâmetro `ErrorCode` reflete o tipo de erro e pode ter um dos seguintes valores: `ProvisionedThroughputExceededException` ou `InternalFailure`. `ErrorMessage` fornece informações mais detalhadas sobre a exceção `ProvisionedThroughputExceededException`, incluindo o ID da conta, o nome do fluxo e o ID do fragmento do registro que foi limitado. O exemplo abaixo tem três registros em uma solicitação `PutRecords`. O segundo registro falha e isso é refletido na resposta. 

**Example PutRecords Sintaxe da solicitação**  

```
{
    "Records": [
        {
    	"Data": "XzxkYXRhPl8w",
	    "PartitionKey": "partitionKey1"
        },
        {
    	"Data": "AbceddeRFfg12asd",
	    "PartitionKey": "partitionKey1"	
        },
        {
    	"Data": "KFpcd98*7nd1",
	    "PartitionKey": "partitionKey3"
        }
    ],
    "StreamName": "myStream"
}
```

**Example PutRecords Sintaxe de resposta**  

```
{
    "FailedRecordCount”: 1,
    "Records": [
        {
	    "SequenceNumber": "21269319989900637946712965403778482371",
	    "ShardId": "shardId-000000000001"

        },
        {
	    “ErrorCode":”ProvisionedThroughputExceededException”,
	    “ErrorMessage": "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111."

        },
        {
	    "SequenceNumber": "21269319989999637946712965403778482985",
	    "ShardId": "shardId-000000000002"
        }
    ]
}
```

Os registros com processamento malsucedido podem ser incluídos nas solicitações `PutRecords` subsequentes. Primeiro, verifique o parâmetro `FailedRecordCount` no `putRecordsResult` para confirmar se há registros com falha na solicitação. Assim sendo, cada `putRecordsEntry` com um `ErrorCode` que não seja `null` deve ser adicionado a uma solicitação subsequente. Para obter um exemplo desse tipo de handler, consulte o seguinte código.

**Example PutRecords manipulador de falhas**  

```
PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
putRecordsRequest.setStreamName(myStreamName);
List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>();
for (int j = 0; j < 100; j++) {
    PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
    putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(j).getBytes()));
    putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", j));
    putRecordsRequestEntryList.add(putRecordsRequestEntry);
}

putRecordsRequest.setRecords(putRecordsRequestEntryList);
PutRecordsResult putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest);

while (putRecordsResult.getFailedRecordCount() > 0) {
    final List<PutRecordsRequestEntry> failedRecordsList = new ArrayList<>();
    final List<PutRecordsResultEntry> putRecordsResultEntryList = putRecordsResult.getRecords();
    for (int i = 0; i < putRecordsResultEntryList.size(); i++) {
        final PutRecordsRequestEntry putRecordRequestEntry = putRecordsRequestEntryList.get(i);
        final PutRecordsResultEntry putRecordsResultEntry = putRecordsResultEntryList.get(i);
        if (putRecordsResultEntry.getErrorCode() != null) {
            failedRecordsList.add(putRecordRequestEntry);
        }
    }
    putRecordsRequestEntryList = failedRecordsList;
    putRecordsRequest.setRecords(putRecordsRequestEntryList);
    putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest);
}
```

### Adicione um único registro com PutRecord
<a name="kinesis-using-sdk-java-putrecord"></a>

Cada chamada para [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html) opera em um único registro. Prefira a operação `PutRecords` descrita em [Adicione vários registros com PutRecords](#kinesis-using-sdk-java-putrecords), a menos que seu aplicativo precise especificamente enviar sempre registos únicos por solicitação ou algum outro motivo para o não uso de `PutRecords`.

Cada registro de dados tem um número sequencial exclusivo. O número de sequência é atribuído pelo Kinesis Data Streams depois que `client.putRecord` é chamada para adicionar o registro de dados ao fluxo. Os números sequenciais da mesma chave de partição geralmente aumentam com o tempo: quanto maior o período entre as solicitações `PutRecord`, maiores ficam os números sequenciais.

 Quando ocorrem colocações em rápida sucessão, não há garantia de que os números de sequência retornados aumentem, porque as operações put aparentam ser essencialmente simultâneas para o Kinesis Data Streams. Para garantir estritamente o aumento de números sequenciais para a mesma chave de partição, use o parâmetro `SequenceNumberForOrdering`, como mostrado no código de exemplo em [PutRecord exemplo](#kinesis-using-sdk-java-putrecord-example). 

 Usando ou não `SequenceNumberForOrdering`, os registros que o Kinesis Data Streams recebe por meio de uma chamada a `GetRecords` são estritamente ordenados por número de sequência. 

**nota**  
Os números de sequência não podem ser usados como índices para conjuntos de dados dentro do mesmo fluxo. Para separar logicamente conjuntos de dados, use chaves de partição ou crie um fluxo separado para cada conjunto de dados.

Uma chave de partição é usada para agrupar os dados dentro de um fluxo. Um registro de dados é atribuído a um fragmento dentro do fluxo com base em sua chave de partição. Especificamente, o Kinesis Data Streams usa a chave de partição como entrada para uma função de hash que mapeia essa chave (e os dados associados) a um determinado fragmento.

 Como resultado desse mecanismo de hashing, todos os registros de dados com a mesma chave de partição são mapeados para o mesmo fragmento no fluxo. No entanto, se o número de chaves de partição ultrapassar o número de fragmentos, alguns fragmentos conterão necessariamente registros com chaves de partição diferentes. Do ponto de vista do design, para garantir que todos os seus fragmentos sejam bem utilizados, o número de fragmentos (especificado pelo método `setShardCount` de `CreateStreamRequest`) deve ser substancialmente menor que o número de chaves de partição exclusivas, e o volume de dados que flui para uma única chave de partição deve ser substancialmente menor que a capacidade do fragmento. 

#### PutRecord exemplo
<a name="kinesis-using-sdk-java-putrecord-example"></a>

O código a seguir cria dez registros de dados, distribuídos entre duas chaves de partição, e os coloca em um fluxo denominado `myStreamName`.

```
for (int j = 0; j < 10; j++) 
{
  PutRecordRequest putRecordRequest = new PutRecordRequest();
  putRecordRequest.setStreamName( myStreamName );
  putRecordRequest.setData(ByteBuffer.wrap( String.format( "testData-%d", j ).getBytes() ));
  putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 ));  
  putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord );
  PutRecordResult putRecordResult = client.putRecord( putRecordRequest );
  sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber();
}
```

O código de exemplo anterior usa `setSequenceNumberForOrdering` para garantir estritamente o aumento da ordenação dentro de cada chave de partição. Para usar esse parâmetro de forma eficaz, defina o `SequenceNumberForOrdering` do registro atual (registro *n*) como o número de sequência do registro anterior (registro *n-1*). Para obter o número sequencial de um registro que foi adicionado ao fluxo, chame `getSequenceNumber` para o resultado de `putRecord`.

O parâmetro `SequenceNumberForOrdering` garante estritamente o aumento de números de sequência para a mesma chave de partição. `SequenceNumberForOrdering` não fornece a ordenação de registros em várias chaves de partição. 

# Interaja com os dados usando o AWS Glue Schema Registry
<a name="kinesis-integration-glue-schema-registry"></a>

Você pode integrar seus streams de dados do Kinesis com o AWS Glue Schema Registry. O registro de esquemas do AWS Glue permite detectar, controlar e evoluir esquemas centralmente, ao mesmo tempo que garante que os dados produzidos sejam validados continuamente por um esquema registrado. O esquema define a estrutura e o formato de um registro de dados. Um esquema é uma especificação versionada para publicação, consumo ou datastore confiáveis. O AWS Glue Schema Registry permite que você melhore a qualidade end-to-end dos dados e a governança de dados em seus aplicativos de streaming. Para obter mais informações, consulte [Registro de esquemas do AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). Uma das formas de configurar essa integração é por meio do `PutRecords` `PutRecord` Kinesis APIs Data Streams disponível AWS no Java SDK. 

Para obter instruções detalhadas sobre como configurar a integração do Kinesis Data Streams com o registro do esquema usando o e o PutRecord Kinesis Data Streams, consulte a seção “Interagindo com dados PutRecords usando o APIs Kinesis Data Streams” [em Caso de uso: Integração](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds) do Amazon Kinesis Data APIs Streams com o Glue Schema Registry. AWS 