

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Desarrolle productores mediante la API Amazon Kinesis Data Streams con AWS SDK para Java
<a name="developing-producers-with-sdk"></a>

Puede desarrollar productores mediante la API de Amazon Kinesis Data Streams con AWS el SDK for Java. Si es la primera vez que utiliza Kinesis Data Streams, familiarícese antes con los conceptos y los términos que encontrará en [¿Qué es Amazon Kinesis Data Streams?](introduction.md) y [Úselo AWS CLI para realizar operaciones de Amazon Kinesis Data Streams](getting-started.md).

En estos ejemplos se aborda la [API de Kinesis Data Streams](https://docs.aws.amazon.com/kinesis/latest/APIReference/) y se utiliza el [SDK de AWS para Java](https://aws.amazon.com/sdk-for-java/) para insertar datos en un flujo. Sin embargo, en la mayoría de los casos de uso, es preferible optar por la biblioteca KPL de Kinesis Data Streams. Para obtener más información, consulte [Desarrollo de productores con Amazon Kinesis Producer Library (KPL)](developing-producers-with-kpl.md).

El código de ejemplo de Java de este capítulo demuestra cómo realizar operaciones básicas con la API de Kinesis Data Streams y está dividido lógicamente por tipo de operación. Estos ejemplos no representan códigos listos para producción, ya que no comprueban todas las excepciones posibles ni toman en cuenta todas las consideraciones de seguridad y desempeño posibles. Además, puede llamar a la [API de Kinesis Data Streams](https://docs.aws.amazon.com/kinesis/latest/APIReference/) mediante otros lenguajes de programación. Para obtener más información sobre todos los productos disponibles AWS SDKs, consulte [Comience a desarrollar con Amazon Web Services](https://aws.amazon.com/developers/getting-started/).

Cada tarea tiene requisitos previos. Por ejemplo, no se pueden agregar datos a una secuencia hasta que se haya creado una, para lo que se ha de crear un cliente. Para obtener más información, consulte [Crear y administrar Kinesis Data Streams](working-with-streams.md).

**Topics**
+ [Agregar datos a un flujo](#kinesis-using-sdk-java-add-data-to-stream)
+ [Interactúe con los datos mediante el registro AWS Glue de esquemas](kinesis-integration-glue-schema-registry.md)

## Agregar datos a un flujo
<a name="kinesis-using-sdk-java-add-data-to-stream"></a>

Una vez que se crea una secuencia, puede agregar datos a ella en forma de registros. Un registro es una estructura de datos que contiene los datos que se han de procesar en forma de un blob de datos. Después de almacenar los datos en el registro, Kinesis Data Streams no inspecciona, interpreta ni cambia los datos. Cada registro también tiene asociado un número secuencial y una clave de partición.

Existen dos operaciones diferentes en la API de Kinesis Data Streams que agregan datos a un flujo: [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) y [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html). La operación `PutRecords` envía varios registros a su secuencia por solicitud HTTP, y la operación única `PutRecord` envía registros a su secuencia, una por una (se necesita una solicitud HTTP independiente para cada registro). Es preferible utilizar `PutRecords` para la mayoría de las aplicaciones, ya que conseguirá un mayor rendimiento por productor de datos. Para obtener más información sobre cada una de estas operaciones, consulte las subsecciones independientes que aparecen a continuación.

**Topics**
+ [Añada varios registros con PutRecords](#kinesis-using-sdk-java-putrecords)
+ [Añada un único registro con PutRecord](#kinesis-using-sdk-java-putrecord)

Siempre debe tener en mente que, a medida que la aplicación de origen agrega datos al flujo mediante la API de Kinesis Data Streams, muy probablemente haya una o varias aplicaciones de consumidor que procesan datos simultáneamente desde el flujo. Para obtener información sobre cómo los consumidores obtienen datos mediante la API de Kinesis Data Streams, consulte [Obtener datos de un flujo](developing-consumers-with-sdk.md#kinesis-using-sdk-java-get-data).

**importante**  
[Cambiar el periodo de retención de datos](kinesis-extended-retention.md)

### Añada varios registros con PutRecords
<a name="kinesis-using-sdk-java-putrecords"></a>

La operación [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) envía varios registros a Kinesis Data Streams en una única solicitud. Al utilizar `PutRecords`, los productores pueden conseguir un mayor rendimiento cuando envían datos a los flujos de datos de Kinesis. Cada solicitud `PutRecords` puede admitir hasta 500 registros. Cada registro puede ser tan grande como 1 MB, hasta un límite de 5 MB para toda la solicitud, incluidas las claves de partición. Al igual que con la operación única `PutRecord` que se describe a continuación, `PutRecords` utiliza números secuenciales y claves de partición. Sin embargo, el parámetro de `PutRecord` `SequenceNumberForOrdering` no se incluye en una llamada `PutRecords`. La operación `PutRecords` intenta procesar todos los registros en el orden natural de la solicitud. 

Cada registro de datos tiene un número secuencial único. Kinesis Data Streams asigna el número secuencial cuando se llama a `client.putRecords` para agregar los registros de datos al flujo. Por lo general, los números secuenciales de una misma clave de partición aumentan con el tiempo; cuanto más grande sea el periodo de tiempo transcurrido entre las solicitudes a `PutRecords`, más aumentan los números secuenciales.

**nota**  
Los números secuenciales no se pueden utilizar como índices de conjuntos de datos dentro de la misma secuencia. Para separar lógicamente conjuntos de datos, utilice claves de partición o cree una secuencia independiente para cada conjunto de datos.

Una solicitud `PutRecords` puede incluir registros con diferentes claves de partición. El ámbito de la solicitud es una secuencia; cada solicitud puede incluir cualquier combinación de claves de partición y registros hasta alcanzar los límites de la solicitud. Las solicitudes realizadas con varias claves de partición a secuencias con muchos fragmentos diferentes suelen ser más rápidas que las solicitudes con un número reducido de claves de partición a un número pequeño de fragmentos. El número de claves de partición debe ser mucho mayor que el número de fragmentos a fin de reducir la latencia y maximizar el rendimiento.

#### PutRecords ejemplo
<a name="kinesis-using-sdk-java-putrecords-example"></a>

El siguiente código crea 100 registros de datos con claves de partición secuenciales y los inserta en una secuencia llamada `DataStream`. 

```
        AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();
        
        clientBuilder.setRegion(regionName);
        clientBuilder.setCredentials(credentialsProvider);
        clientBuilder.setClientConfiguration(config);
        
        AmazonKinesis kinesisClient = clientBuilder.build();
 
        PutRecordsRequest putRecordsRequest  = new PutRecordsRequest();
        putRecordsRequest.setStreamName(streamName);
        List <PutRecordsRequestEntry> putRecordsRequestEntryList  = new ArrayList<>(); 
        for (int i = 0; i < 100; i++) {
            PutRecordsRequestEntry putRecordsRequestEntry  = new PutRecordsRequestEntry();
            putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes()));
            putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i));
            putRecordsRequestEntryList.add(putRecordsRequestEntry); 
        }

        putRecordsRequest.setRecords(putRecordsRequestEntryList);
        PutRecordsResult putRecordsResult  = kinesisClient.putRecords(putRecordsRequest);
        System.out.println("Put Result" + putRecordsResult);
```

La respuesta `PutRecords` incluye una gama de `Records` de respuesta. Cada registro de la matriz de respuestas se correlaciona directamente con un registro en la matriz de solicitudes siguiendo el orden natural, de arriba abajo de la solicitud y la respuesta. La matriz de `Records` de respuesta siempre incluye el mismo número de registros que la matriz de solicitudes.

#### Maneje las fallas al usar PutRecords
<a name="kinesis-using-sdk-java-putrecords-handling-failures"></a>

De forma predeterminada, el error de registros individuales en una solicitud no para el procesamiento de los registros siguientes en una solicitud `PutRecords`. Esto significa que una matriz de `Records` de respuesta incluye tanto los registros procesados correctamente como los que no. Debe detectar los registros procesados de forma incorrecta e incluirlos en una llamada posterior. 

Los registros correctos incluyen los valores `SequenceNumber` y `ShardID`, y los registros incorrectos incluyen los valores `ErrorCode` y `ErrorMessage`. El parámetro `ErrorCode` refleja el tipo de error y puede tomar uno de los siguientes valores: `ProvisionedThroughputExceededException` o `InternalFailure`. `ErrorMessage` proporciona información más detallada sobre la excepción `ProvisionedThroughputExceededException`, e incluye el ID de la cuenta, el nombre de la secuencia y el ID del fragmento del registro al que se ha aplicado limitación. El ejemplo siguiente tiene tres registros en una solicitud `PutRecords`. El segundo registro ha generado un error y se refleja en la respuesta. 

**Example PutRecords Solicita la sintaxis**  

```
{
    "Records": [
        {
    	"Data": "XzxkYXRhPl8w",
	    "PartitionKey": "partitionKey1"
        },
        {
    	"Data": "AbceddeRFfg12asd",
	    "PartitionKey": "partitionKey1"	
        },
        {
    	"Data": "KFpcd98*7nd1",
	    "PartitionKey": "partitionKey3"
        }
    ],
    "StreamName": "myStream"
}
```

**Example PutRecords Sintaxis de respuesta**  

```
{
    "FailedRecordCount”: 1,
    "Records": [
        {
	    "SequenceNumber": "21269319989900637946712965403778482371",
	    "ShardId": "shardId-000000000001"

        },
        {
	    “ErrorCode":”ProvisionedThroughputExceededException”,
	    “ErrorMessage": "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111."

        },
        {
	    "SequenceNumber": "21269319989999637946712965403778482985",
	    "ShardId": "shardId-000000000002"
        }
    ]
}
```

Los registros que se procesan sin éxito se pueden incluir en las solicitudes `PutRecords` posteriores. En primer lugar, compruebe el parámetro `FailedRecordCount` en `putRecordsResult` para confirmar si se hay registros con error en la solicitud. En caso afirmativo, cada `putRecordsEntry` que tenga un `ErrorCode` que no sea `null` se debe agregar a una solicitud posterior. Para un ejemplo de este tipo de controlador, consulte el siguiente código.

**Example PutRecords manejador de fallas**  

```
PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
putRecordsRequest.setStreamName(myStreamName);
List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>();
for (int j = 0; j < 100; j++) {
    PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
    putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(j).getBytes()));
    putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", j));
    putRecordsRequestEntryList.add(putRecordsRequestEntry);
}

putRecordsRequest.setRecords(putRecordsRequestEntryList);
PutRecordsResult putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest);

while (putRecordsResult.getFailedRecordCount() > 0) {
    final List<PutRecordsRequestEntry> failedRecordsList = new ArrayList<>();
    final List<PutRecordsResultEntry> putRecordsResultEntryList = putRecordsResult.getRecords();
    for (int i = 0; i < putRecordsResultEntryList.size(); i++) {
        final PutRecordsRequestEntry putRecordRequestEntry = putRecordsRequestEntryList.get(i);
        final PutRecordsResultEntry putRecordsResultEntry = putRecordsResultEntryList.get(i);
        if (putRecordsResultEntry.getErrorCode() != null) {
            failedRecordsList.add(putRecordRequestEntry);
        }
    }
    putRecordsRequestEntryList = failedRecordsList;
    putRecordsRequest.setRecords(putRecordsRequestEntryList);
    putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest);
}
```

### Añada un único registro con PutRecord
<a name="kinesis-using-sdk-java-putrecord"></a>

Cada llamada a [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html) opera sobre un solo registro. Es preferible recurrir a la operación `PutRecords` que se describe en [Añada varios registros con PutRecords](#kinesis-using-sdk-java-putrecords) a menos que su aplicación necesite específicamente enviar siempre registros individuales en cada solicitud, o que por cualquier otro motivo no se pueda utilizar `PutRecords`.

Cada registro de datos tiene un número secuencial único. Kinesis Data Streams asigna el número secuencial cuando se llama a `client.putRecord` para agregar el registro de datos al flujo. Por lo general, los números secuenciales de una misma clave de partición aumentan con el tiempo; cuanto más grande sea el periodo de tiempo transcurrido entre las solicitudes a `PutRecord`, más aumentan los números secuenciales.

 Cuando se producen inserciones (puts) en una sucesión rápida, no hay garantía de que los números secuenciales devueltos aumenten, ya que las operaciones put aparecen esencialmente de manera simultánea a Kinesis Data Streams. Para garantizar unos números secuenciales estrictamente en aumento para la misma clave de partición, utilice el parámetro `SequenceNumberForOrdering` tal y como se muestra en el código de muestra de [PutRecord ejemplo](#kinesis-using-sdk-java-putrecord-example). 

 Independientemente de que use o no `SequenceNumberForOrdering`, los registros que recibe Kinesis Data Streams mediante una llamada a `GetRecords` están estrictamente ordenados por número secuencial. 

**nota**  
Los números secuenciales no se pueden utilizar como índices de conjuntos de datos dentro de la misma secuencia. Para separar lógicamente conjuntos de datos, utilice claves de partición o cree una secuencia independiente para cada conjunto de datos.

Una clave de partición se utiliza para agrupar datos dentro de una secuencia. Un registro de datos se asigna a un fragmento dentro de la secuencia en función de su clave de partición. En concreto, Kinesis Data Streams utiliza la clave de partición como entrada para una función hash que asigna la clave de partición (y los datos asociados) a una partición específica.

 Como resultado de este mecanismo de hash, todos los registros de datos con la misma clave de partición se asignan al mismo fragmento dentro de la secuencia. Sin embargo, si el número de claves de partición supera el número de fragmentos, algunos fragmentos han de contener necesariamente registros con diferentes claves de partición. Desde el punto de vista del diseño, para garantizar que todos los fragmentos estén bien utilizados, el número de fragmentos (especificado mediante el método `setShardCount` de `CreateStreamRequest`) debe ser significativamente inferior al número de claves de partición únicas, y la cantidad de datos que entran en una única clave de partición debe ser significativamente inferior a la capacidad del fragmento. 

#### PutRecord ejemplo
<a name="kinesis-using-sdk-java-putrecord-example"></a>

El siguiente código crea diez registros de datos distribuidos en dos claves de partición y los inserta en una secuencia llamada `myStreamName`.

```
for (int j = 0; j < 10; j++) 
{
  PutRecordRequest putRecordRequest = new PutRecordRequest();
  putRecordRequest.setStreamName( myStreamName );
  putRecordRequest.setData(ByteBuffer.wrap( String.format( "testData-%d", j ).getBytes() ));
  putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 ));  
  putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord );
  PutRecordResult putRecordResult = client.putRecord( putRecordRequest );
  sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber();
}
```

El código de muestra anterior utiliza `setSequenceNumberForOrdering` para garantizar un orden estrictamente creciente en cada clave de partición. Para utilizar este parámetro de forma eficaz, establezca como `SequenceNumberForOrdering` del registro actual (registro *n*) el número secuencial del registro anterior (registro *n-1*). Para obtener el número secuencial de un registro que se ha añadido a la secuencia, llame a `getSequenceNumber` en el resultado de `putRecord`.

El parámetro `SequenceNumberForOrdering` garantiza números de secuencia estrictamente crecientes para la misma clave de partición. `SequenceNumberForOrdering` no permite ordenar los registros en varias claves de partición. 

# Interactúe con los datos mediante el registro AWS Glue de esquemas
<a name="kinesis-integration-glue-schema-registry"></a>

Puede integrar sus flujos de datos de Kinesis con el registro de AWS Glue esquemas. AWS Glue Schema Registry le permite descubrir, controlar y evolucionar de forma centralizada esquemas, además de garantizar que un esquema registrado valide de forma continua los datos generados. Un esquema define la estructura y el formato de un registro de datos. Un esquema es una especificación versionada para publicación, consumo o almacenamiento de confianza de datos. El registro AWS Glue de esquemas le permite mejorar la end-to-end calidad y el gobierno de los datos en sus aplicaciones de streaming. Para obtener más información, consulte [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). Una de las formas de configurar esta integración es mediante Kinesis Data Streams `PutRecords` y `PutRecord` Kinesis Data APIs Streams disponibles en AWS el SDK de Java. 

Para obtener instrucciones detalladas sobre cómo configurar la integración de Kinesis Data Streams con el registro de esquemas mediante Kinesis Data Streams PutRecord y Kinesis, consulte PutRecords la [sección «Interacción con los datos APIs mediante APIs Kinesis Data Streams» en Caso de uso: integración de Amazon Kinesis Data Streams con el registro de esquemas de Glue](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds). AWS 