Implemente el consumidor - Amazon Kinesis Data Streams

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Implemente el consumidor

La aplicación consumidora del Tutorial: Procese datos bursátiles en tiempo real con KPL and KCL 1.x procesa de forma continua la secuencia de operaciones bursátiles que se ha creado en Implemente el productor. A continuación, genera como resultado las acciones que más se compran y venden cada minuto. La aplicación se basa en la biblioteca de clientes de Kinesis (KCL), que realiza gran parte del trabajo pesado que suelen realizar las aplicaciones de consumo. Para obtener más información, consulte Desarrolle KCL consumidores 1.x.

Consulte el código fuente y revise la siguiente información.

StockTradesProcessor clase

Clase principal del consumidor, que proporcionamos por usted y realiza las siguientes tareas:

  • Lee los nombres de aplicación, secuencia y región que se pasan como argumentos.

  • Lee las credenciales de ~/.aws/credentials.

  • Crea una instancia de RecordProcessorFactory que sirve instancias de RecordProcessor, implementadas por una instancia de StockTradeRecordProcessor.

  • Crea un servidor KCL con la RecordProcessorFactory instancia y una configuración estándar que incluye el nombre del flujo, las credenciales y el nombre de la aplicación.

  • El proceso de trabajo crea un subproceso nuevo para cada partición (asignado a esta instancia del consumidor), que se ejecuta en bucle continuamente para leer registros de Kinesis Data Streams. A continuación, invoca a la instancia de RecordProcessor para procesar cada lote de registros recibido.

StockTradeRecordProcessor clase

Implementación de la instancia RecordProcessor, que a su vez implementa tres métodos necesarios: initialize, processRecords y shutdown.

Como indican sus nombres, initialize y shutdown se utilizan en Kinesis Client Library para que el procesador de registros sepa cuándo debe estar listo para empezar a recibir registros y cuándo debe esperar dejar de recibirlos, respectivamente, de modo que pueda realizar cualquier tarea de configuración y finalización específica de la aplicación. Le proporcionamos el código de los mismos. El procesamiento principal sucede en el método processRecords, que a su vez utiliza processRecord para cada registro. Este último método se proporciona principalmente como código estructural prácticamente vacío, para que pueda implementarlo en el siguiente paso, donde se explica más a fondo.

También hay que resaltar la implementación de métodos de apoyo para processRecord: reportStatsy resetStats, que están vacíos en el código fuente original.

El método processRecords se implementa por usted, y realiza los pasos siguientes:

  • Para cada registro que se pase, llama a processRecord.

  • Si ha pasado al menos 1 minuto desde el último informe, llama a reportStats(), que imprime las últimas estadísticas y, a continuación, a resetStats(), que elimina las estadísticas para que el próximo intervalo incluya solo registros nuevos.

  • Establece el momento del siguiente informe.

  • Si ha transcurrido al menos un minuto desde el último punto de comprobación de la base de datos, llama a checkpoint().

  • Establece el momento del siguiente punto de comprobación.

Este método utiliza intervalos de 60 segundos para la velocidad de elaboración de informes y puntos de comprobación. Para obtener más información sobre los puntos de control, consulte Información adicional sobre el consumidor.

StockStats clase

Esta clase proporciona retención de datos y seguimiento de estadísticas para las acciones más populares a lo largo del tiempo. Proporcionamos este código por usted y contiene los siguientes métodos:

  • addStockTrade(StockTrade): inserta el StockTrade dado en las estadísticas de ejecución.

  • toString(): devuelve las estadísticas en una cadena con formato.

Esta clase realiza un seguimiento de las acciones más populares manteniendo un recuento continuo del número total de operaciones de cada acción y del recuento máximo. Asimismo, actualiza estos recuentos cada vez que llega una operación nueva.

Agregar código para los métodos de la clase StockTradeRecordProcessor, tal y como se muestra en los pasos siguientes.

Para implementar el consumidor
  1. Implemente el método processRecord creando una instancia de un objeto StockTrade con el tamaño correcto y añadiéndole los datos de registro, registrando una advertencia si hay algún problema.

    StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array()); if (trade == null) { LOG.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.getPartitionKey()); return; } stockStats.addStockTrade(trade);
  2. Implemente un método reportStats sencillo. Si lo desea, puede modificar el formato de salida según sus preferencias.

    System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" + stockStats + "\n" + "****************************************************************\n");
  3. Por último, implemente el método resetStats, lo que creará una nueva instancia de stockStats.

    stockStats = new StockStats();
Para ejecutar el consumidor
  1. Ejecute el productor que escribió en Implemente el productor para insertar registros de operaciones bursátiles simuladas en la secuencia.

  2. Compruebe que la clave de acceso y la clave secreta recuperadas anteriormente (al crear el IAM usuario) estén guardadas en el archivo~/.aws/credentials.

  3. Ejecute la clase StockTradesProcessor con los siguientes argumentos:

    StockTradesProcessor StockTradeStream us-west-2

    Tenga en cuenta que si ha creado su secuencia en una región diferente a us-west-2 tiene que especificar esa región aquí.

Después de un minuto, debería ver un resultado similar a este, actualizado cada minuto:

****** Shard shardId-000000000001 stats for last 1 minute ****** Most popular stock being bought: WMT, 27 buys. Most popular stock being sold: PTR, 14 sells. ****************************************************************

Información adicional sobre el consumidor

Si está familiarizado con las ventajas de Kinesis Client Library, que se tratan en Desarrolle KCL consumidores 1.x y en otros artículos, es posible que se pregunte por qué debería utilizarla aquí. Aunque solo utilice una secuencia de fragmentos y una sola instancia de consumidor para procesarla, es aún más fácil implementarla para el consumidor mediante. KCL Compare los pasos de implementación del código en la sección del productor con la del consumidor, y podrá ver en comparación la facilidad de implementar un consumidor. Esto se debe en gran parte a los servicios que KCL ofrece.

En esta aplicación, se centrará en la implementación de una clase de procesador de registros que puede procesar registros individuales. No tiene que preocuparse por la forma en que se obtienen los registros de Kinesis Data Streams; busca KCL los registros e invoca al procesador de registros siempre que haya nuevos registros disponibles. Tampoco tiene que preocuparse del número de fragmentos o instancias del consumidor que hay. Si la secuencia se amplía, no será necesario volver a escribir su aplicación para administrar más de un fragmento o una instancia del consumidor.

El término puntos de control significa registrar el punto de la transmisión hasta los registros de datos que se han consumido y procesado hasta el momento. Si la aplicación se bloquea, la transmisión se lee desde ese punto y no desde el principio de la transmisión. El tema de los puntos de comprobación, los diversos patrones de diseño y las prácticas recomendadas al respecto quedan fuera del ámbito de este capítulo. Sin embargo, es algo que puede encontrar en entornos de producción.

Como aprendió enImplemente el productor, las put operaciones de Kinesis Data API Streams utilizan una clave de partición como entrada. Kinesis Data Streams utiliza una clave de partición como mecanismo para dividir los registros en varias particiones (cuando hay más de una partición en el flujo). La misma clave de partición siempre se dirige al mismo fragmento. Esto permite que el consumidor que procesa un determinado fragmento se diseñe con el supuesto de que los registros con la misma clave de partición solo se envían a ese consumidor, y que ningún registro con la misma clave de partición acaba en ningún otro consumidor. Por lo tanto, un proceso de trabajo del consumidor puede agregar todos los registros con la misma clave de partición sin preocuparse de que podrían faltar datos necesarios.

En esta aplicación, el procesamiento de registros por parte del consumidor no es intensivo, por lo que puede utilizar un fragmento y realizar el procesamiento en el mismo subproceso que el subproceso. KCL En la práctica, sin embargo, piense primero en aumentar el número de fragmentos. En algunos casos, es posible que desee cambiar el procesamiento a otro subproceso o utilizar un grupo de subprocesos si espera que el procesamiento de registros sea intenso. De esta forma, KCL pueden buscar nuevos registros más rápidamente, mientras que los otros subprocesos pueden procesar los registros en paralelo. El diseño multiproceso no es trivial y debe abordarse con técnicas avanzadas, por lo que aumentar el número de fragmentos suele ser la forma más eficaz de ampliarlo.

Siguientes pasos

(Opcional) Amplíe el número de consumidores