

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Desarrolle a los consumidores con AWS SDK para Java
<a name="develop-consumers-sdk"></a>

 Puede desarrollar consumidores personalizados mediante Amazon Kinesis Data APIs Streams. En esta sección se describe el uso de Kinesis Data APIs Streams con AWS SDK para Java.

**importante**  
El método recomendado para desarrollar consumidores personalizados de Kinesis Data Streams con rendimiento compartido es utilizar la Bibliotecta de clientes de Kinesis (KCL). KCL ayuda a consumir y procesar los datos de un flujo de datos de Kinesis, ya que se encarga de muchas de las tareas complejas asociadas a la computación distribuida. Para obtener más información, consulte [Desarrollar consumidores con KCL en Java](develop-kcl-consumers-java.md).

**Topics**
+ [Desarrolle consumidores con un rendimiento compartido con AWS SDK para Java](developing-consumers-with-sdk.md)
+ [Desarrolle un mayor número de consumidores con el AWS SDK para Java](building-enhanced-consumers-api.md)
+ [Interactúe con los datos mediante el registro de esquemas AWS Glue](building-enhanced-consumers-glue-schema-registry.md)

# Desarrolle consumidores con un rendimiento compartido con AWS SDK para Java
<a name="developing-consumers-with-sdk"></a>

Uno de los métodos para desarrollar Kinesis Data Streams personalizados para los consumidores con contenido compartido en todo momento consiste en utilizar Amazon Kinesis Data APIs Streams con. AWS SDK para Java En esta sección se describe el uso de Kinesis Data APIs Streams con AWS SDK para Java. Puede llamar a Kinesis Data APIs Streams mediante otros lenguajes de programación diferentes. Para obtener más información sobre todos los productos disponibles AWS SDKs, consulte [Comience a desarrollar con Amazon Web Services](https://aws.amazon.com/developers/getting-started/). 

El código de muestra de Java de esta sección indica cómo realizar operaciones básicas con la API de Kinesis Data Streams, y está dividido lógicamente por tipo de operación. Estos ejemplos no representan código listo para producción. No comprueban todas las excepciones posibles ni tienen en cuenta todas las consideraciones de seguridad y de rendimiento. 

**Topics**
+ [Obtener datos de un flujo](#kinesis-using-sdk-java-get-data)
+ [Usar iteradores de particiones](#kinesis-using-sdk-java-get-data-shard-iterators)
+ [Utilice GetRecords](#kinesis-using-sdk-java-get-data-getrecords)
+ [Adaptarse a una nueva partición](#kinesis-using-sdk-java-get-data-reshard)

## Obtener datos de un flujo
<a name="kinesis-using-sdk-java-get-data"></a>

Los Kinesis Data APIs Streams incluyen `getShardIterator` los métodos `getRecords` y que puede invocar para recuperar registros de un flujo de datos. Se trata del modelo de extracción, donde el código extrae registros de datos directamente de las particiones del flujo de datos.

**importante**  
Se recomienda utilizar la funcionalidad de procesador de registros que proporciona KCL para recuperarlos de los flujos de datos. Se trata del modelo de inserción, en el que debe implementar el código que procesa los datos. KCL recupera los registros de datos del flujo de datos y los entrega al código de la aplicación. Además, KCL proporciona funciones de recuperación, conmutación por error y equilibrio de carga. Para obtener más información, consulte [Desarrollo de consumidores personalizados con rendimiento compartido mediante KCL](https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html).

Sin embargo, en algunos casos puede que prefiera utilizar Kinesis Data APIs Streams. Por ejemplo, para implementar herramientas personalizadas para la supervisión o la depuración de los flujos de datos.

**importante**  
Kinesis Data Streams admite cambios en el periodo de retención de los registros de datos del flujo de datos. Para obtener más información, consulte [Cambiar el periodo de retención de datos](kinesis-extended-retention.md).

## Usar iteradores de particiones
<a name="kinesis-using-sdk-java-get-data-shard-iterators"></a>

Puede recuperar registros desde la secuencia por fragmentos. Para cada fragmento y cada lote de registros obtenido de ese fragmento debe conseguir un *iterador de fragmentos*. El iterador de fragmentos se utiliza en el objeto `getRecordsRequest` para especificar el fragmento a partir del cual deben recuperarse los registros. El tipo asociado con el iterador de fragmentos determina el punto del fragmento a partir del cual deben recuperarse los registros (consulte la información que se incluye más adelante en esta sección para obtener más detalles). Antes de trabajar con el iterador de particiones, tendrá que recuperar la partición. Para obtener más información, consulte [Obtener lista de particiones](kinesis-using-sdk-java-list-shards.md).

Obtenga el iterador de fragmentos inicial con el método `getShardIterator`. Obtenga iteradores de fragmentos para lotes adicionales de registros utilizando el método `getNextShardIterator` del objeto `getRecordsResult` que devuelve el método `getRecords`. Un iterador de fragmentos es válido durante 5 minutos. Si utiliza un iterador de fragmentos mientras sea válido, obtendrá uno nuevo. Cada iterador de fragmentos mantiene su validez durante 5 minutos, incluso después de utilizarlo.

Para obtener el iterador de fragmentos inicial, cree instancias de `GetShardIteratorRequest` y páselas al método `getShardIterator`. Para configurar la solicitud, especifique la secuencia y el ID del fragmento. Para obtener información sobre cómo obtener las transmisiones de su AWS cuenta, consulte[Lista de secuencias](kinesis-using-sdk-java-list-streams.md). Para obtener información sobre cómo obtener los fragmentos en una secuencia, consulte [Obtener lista de particiones](kinesis-using-sdk-java-list-shards.md).

```
String shardIterator;
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
getShardIteratorRequest.setStreamName(myStreamName);
getShardIteratorRequest.setShardId(shard.getShardId());
getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON");

GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest);
shardIterator = getShardIteratorResult.getShardIterator();
```

Este código de muestra especifica `TRIM_HORIZON` como el tipo de iterador que se utiliza para obtener el iterador de fragmentos inicial. Este tipo de iterador implica que se deben devolver los registros y comenzar por el primer registro agregado a la partición, en lugar de comenzar por el registro agregado más recientemente, también denominado *extremo*. Los tipos de iteradores posibles son los siguientes:
+ `AT_SEQUENCE_NUMBER`
+ `AFTER_SEQUENCE_NUMBER`
+ `AT_TIMESTAMP`
+ `TRIM_HORIZON`
+ `LATEST`

Para obtener más información, consulte [ShardIteratorType](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#Kinesis-GetShardIterator-request-ShardIteratorType).

Algunos tipos de iteradores requieren que se especifique un número de secuencia además del tipo, por ejemplo:

```
getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER");
getShardIteratorRequest.setStartingSequenceNumber(specialSequenceNumber);
```

Después de obtener un registro mediante `getRecords`, puede obtener el número de secuencia del registro si llama al método `getSequenceNumber` del registro. 

```
record.getSequenceNumber()
```

Además, el código que añade registros a la secuencia de datos puede obtener el número de secuencia para un registro añadido llamando a `getSequenceNumber` en el resultado de `putRecord`. 

```
lastSequenceNumber = putRecordResult.getSequenceNumber();
```

Puede utilizar números secuenciales para garantizar que los registros tengan un orden estrictamente ascendente. Para obtener más información, consulte el código de ejemplo en [PutRecord ejemplo](developing-producers-with-sdk.md#kinesis-using-sdk-java-putrecord-example).

## Utilice GetRecords
<a name="kinesis-using-sdk-java-get-data-getrecords"></a>

Una vez que haya obtenido el iterador de fragmentos, cree una instancia de un objeto `GetRecordsRequest`. Especifique el iterador para la solicitud con el método `setShardIterator`. 

También puede establecer el número de registros que quiera recuperar mediante el método `setLimit`. El número de registros que devuelve `getRecords` es siempre igual o inferior a este límite. Si no especifica este límite, `getRecords` devuelve 10 MB de registros recuperados. El código de muestra que aparece a continuación establece este límite en 25 registros.

Si no se devuelven, significa que no hay registros de datos disponibles actualmente en este fragmento con el número de secuencia al que hace referencia el iterador de fragmentos. En una situación así, la aplicación debe esperar una cantidad de tiempo adecuada para los orígenes de datos del flujo. Intente obtener datos de nuevo a partir del fragmento mediante el iterador de fragmentos que ha devuelto la llamada anterior a `getRecords`. 

Pase la `getRecordsRequest` al método `getRecords` y capture el valor devuelto como un objeto `getRecordsResult`. Para obtener los registros de datos, llame al método `getRecords` en el objeto `getRecordsResult`. 

```
GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(25);

GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest);
List<Record> records = getRecordsResult.getRecords();
```

Para prepararse para otra llamada a `getRecords`, obtenga el siguiente iterador de fragmentos desde `getRecordsResult`. 

```
shardIterator = getRecordsResult.getNextShardIterator();
```

Para obtener resultados óptimos, suspenda la actividad durante al menos 1 segundo (1000 milisegundos) entre las llamadas a `getRecords` para evitar que se supere el límite de frecuencia de `getRecords`. 

```
try {
  Thread.sleep(1000);
}
catch (InterruptedException e) {}
```

Normalmente, debe llamar a `getRecords` en bucle, incluso cuando recupere un solo registro en un entorno de pruebas. Una única llamada a `getRecords` podría devolver una lista de registros vacía, incluso si el fragmento contiene más registros en números secuenciales posteriores. Si ocurre esto, el `NextShardIterator` que se devuelve junto con la lista de registros vacía hace referencia a un número de secuencia posterior en el fragmento, y las llamadas sucesivas a `getRecords` acabarán por devolver los registros. El siguiente ejemplo ilustra el uso de un bucle.

**Ejemplo: getRecords**  
El siguiente ejemplo de código refleja las sugerencias sobre `getRecords` que hemos planteado en esta sección, incluidas la realización de llamadas en bucle.

```
// Continuously read data records from a shard
List<Record> records;
    
while (true) {
   
  // Create a new getRecordsRequest with an existing shardIterator 
  // Set the maximum records to return to 25
  
  GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
  getRecordsRequest.setShardIterator(shardIterator);
  getRecordsRequest.setLimit(25); 

  GetRecordsResult result = client.getRecords(getRecordsRequest);
  
  // Put the result into record list. The result can be empty.
  records = result.getRecords();
  
  try {
    Thread.sleep(1000);
  } 
  catch (InterruptedException exception) {
    throw new RuntimeException(exception);
  }
  
  shardIterator = result.getNextShardIterator();
}
```

Si utiliza Kinesis Client Library, esta podría hacer varias llamadas antes de devolver los datos. Este es el comportamiento intencionado según el diseño y no indica ningún problema con KCL o los datos.

## Adaptarse a una nueva partición
<a name="kinesis-using-sdk-java-get-data-reshard"></a>

 Si `getRecordsResult.getNextShardIterator` devuelve `null`, indica que se ha producido una división o combinación de una partición que ha implicado esta partición. Esta partición se encuentra ahora en un estado `CLOSED` y se han leído todos los registros de datos disponibles de esta partición. 

 En este escenario, puede utilizar `getRecordsResult.childShards` para obtener información sobre las nuevas particiones secundarias de la partición que se procesa y que se crearon mediante la división o la combinación. Para obtener más información, consulte [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html).

 En el caso de una división, los dos nuevos fragmentos tienen un `parentShardId` igual al ID de fragmento del fragmento que estuviera procesando anteriormente. El valor de `adjacentParentShardId` para ambos fragmentos es `null`. 

 En el caso de una fusión, el único fragmento nuevo creado por la fusión tiene un `parentShardId` igual al ID del fragmento de uno de los fragmentos de origen y un `adjacentParentShardId` igual al ID de fragmento del otro fragmento de origen. La aplicación ya ha leído todos los datos de uno de estos fragmentos. Este es el fragmento para el que `getRecordsResult.getNextShardIterator` ha devuelto `null`. Si el orden de los datos es importante en la aplicación, debe asegurarse de que esta también lea todos los datos del otro fragmento principal antes de leer datos nuevos del fragmento secundario creado por la fusión. 

 Si utiliza varios procesadores para recuperar los datos de la secuencia (por ejemplo, un procesador por fragmento) y se produce una división o fusión de fragmentos, debe aumentar o disminuir el número de procesadores para adaptarse a los cambios en el número de fragmentos. 

 Para obtener más información acerca de cómo realizar cambios en los fragmentos, incluida una explicación de los estados de los fragmentos, como `CLOSED`, consulte [Cambiar las particiones de un flujo](kinesis-using-sdk-java-resharding.md). 

# Desarrolle un mayor número de consumidores con el AWS SDK para Java
<a name="building-enhanced-consumers-api"></a>

La *distribución ramificada mejorada* es una característica de Amazon Kinesis Data Streams que permite a los consumidores recibir registros de un flujo de datos con un rendimiento dedicado de hasta 2 MB de datos por segundo por partición. Un consumidor que utiliza la distribución ramificada mejorada no tiene que competir con otros consumidores que reciben datos de la secuencia. Para obtener más información, consulte [Desarrollo de consumidores de distribución ramificada mejorada con rendimiento dedicado](enhanced-consumers.md).

Puede utilizar las operaciones de la API para crear un consumidor que utilice la distribución ramificada mejorada en Kinesis Data Streams.

**Para registrar un consumidor con distribución ramificada mejorada mediante la API de Kinesis Data Streams**

1. Llame [RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html)para registrar su solicitud como consumidor que utiliza un sistema de distribución ampliado. Kinesis Data Streams genera un nombre de recurso de Amazon (ARN) para el consumidor y lo devuelve en la respuesta.

1. Para empezar a escuchar un fragmento específico, pasa el ARN del consumidor en una llamada a. [SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html) A continuación, Kinesis Data Streams comienza a enviarle los registros de ese fragmento, en forma de eventos de [SubscribeToShardEvent](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShardEvent.html)tipo a través de una conexión HTTP/2. La conexión permanece abierta durante un máximo de 5 minutos. [SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html)Vuelva a llamar si quiere seguir recibiendo los registros del fragmento una vez `future` que la llamada devuelva y se [SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html)complete de forma normal o excepcional.
**nota**  
La API `SubscribeToShard` también devuelve la lista de las particiones secundarias de la partición actual cuando se alcanza el final de la partición actual. 

1. Para anular el registro de un consumidor que utiliza la función de distribución mejorada, llama al teléfono. [DeregisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeregisterStreamConsumer.html)

El código siguiente es un ejemplo de cómo suscribir el consumidor a un fragmento, renovar la suscripción de forma periódica y controlar los eventos.

```
    import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
    import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
    import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
     
    import java.util.concurrent.CompletableFuture;
     
    /**
     * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java
     * for complete code and more examples.
     */
    public class SubscribeToShardSimpleImpl {
     
        private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737";
        private static final String SHARD_ID = "shardId-000000000000";
     
        public static void main(String[] args) {
     
            KinesisAsyncClient client = KinesisAsyncClient.create();
     
            SubscribeToShardRequest request = SubscribeToShardRequest.builder()
                    .consumerARN(CONSUMER_ARN)
                    .shardId(SHARD_ID)
                    .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build();
     
            // Call SubscribeToShard iteratively to renew the subscription periodically.
            while(true) {
                // Wait for the CompletableFuture to complete normally or exceptionally.
                callSubscribeToShardWithVisitor(client, request).join();
            }
     
            // Close the connection before exiting.
            // client.close();
        }
     
     
        /**
         * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface.
         */
        private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) {
            SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() {
                @Override
                public void visit(SubscribeToShardEvent event) {
                    System.out.println("Received subscribe to shard event " + event);
                }
            };
            SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
                    .builder()
                    .onError(t -> System.err.println("Error during stream - " + t.getMessage()))
                    .subscriber(visitor)
                    .build();
            return client.subscribeToShard(request, responseHandler);
        }
    }
```

 Si `event.ContinuationSequenceNumber` devuelve `null`, indica que se ha producido una división o combinación de una partición que ha implicado esta partición. Esta partición se encuentra ahora en un estado `CLOSED`, y se han leído todos los registros de datos disponibles de esta partición. En este escenario, según el ejemplo anterior, puede utilizar `event.childShards` para obtener información sobre las nuevas particiones secundarias de la partición que se procesa y que se crearon mediante la división o la combinación. Para obtener más información, consulte [ChildShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ChildShard.html).

# Interactúe con los datos mediante el registro de esquemas AWS Glue
<a name="building-enhanced-consumers-glue-schema-registry"></a>

Puede integrar sus flujos de datos de Kinesis con el registro de AWS Glue esquemas. AWS Glue Schema Registry le permite descubrir, controlar y evolucionar de forma centralizada esquemas, además de garantizar que un esquema registrado valide de forma continua los datos generados. Un esquema define la estructura y el formato de un registro de datos. Un esquema es una especificación versionada para publicación, consumo o almacenamiento de confianza de datos. El registro de AWS Glue esquemas le permite mejorar la end-to-end calidad y el gobierno de los datos en sus aplicaciones de streaming. Para obtener más información, consulte [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). Una de las formas de configurar esta integración es mediante la API de `GetRecords` Kinesis Data Streams, disponible en AWS el SDK de Java. 

Para obtener instrucciones detalladas sobre cómo configurar la integración de Kinesis Data Streams con Schema Registry mediante Kinesis Data APIs Streams, consulte `GetRecords` la [sección «Interacción con datos mediante Kinesis Data Streams» en Caso de uso: integración de Amazon Kinesis APIs Data Streams con el registro de Glue](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds) Schema. AWS 