Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Implemente el consumidor
La aplicación consumidora del Tutorial: Procese datos bursátiles en tiempo real con KPL and KCL 1.x procesa de forma continua la secuencia de operaciones bursátiles que se ha creado en Implemente el productor. A continuación, genera como resultado las acciones que más se compran y venden cada minuto. La aplicación se basa en la biblioteca de clientes de Kinesis (KCL), que realiza gran parte del trabajo pesado que suelen realizar las aplicaciones de consumo. Para obtener más información, consulte Desarrolle KCL consumidores 1.x.
Consulte el código fuente y revise la siguiente información.
- StockTradesProcessor clase
-
Clase principal del consumidor, que proporcionamos por usted y realiza las siguientes tareas:
-
Lee los nombres de aplicación, secuencia y región que se pasan como argumentos.
-
Lee las credenciales de
~/.aws/credentials
. -
Crea una instancia de
RecordProcessorFactory
que sirve instancias deRecordProcessor
, implementadas por una instancia deStockTradeRecordProcessor
. -
Crea un servidor KCL con la
RecordProcessorFactory
instancia y una configuración estándar que incluye el nombre del flujo, las credenciales y el nombre de la aplicación. -
El proceso de trabajo crea un subproceso nuevo para cada partición (asignado a esta instancia del consumidor), que se ejecuta en bucle continuamente para leer registros de Kinesis Data Streams. A continuación, invoca a la instancia de
RecordProcessor
para procesar cada lote de registros recibido.
-
- StockTradeRecordProcessor clase
-
Implementación de la instancia
RecordProcessor
, que a su vez implementa tres métodos necesarios:initialize
,processRecords
yshutdown
.Como indican sus nombres,
initialize
yshutdown
se utilizan en Kinesis Client Library para que el procesador de registros sepa cuándo debe estar listo para empezar a recibir registros y cuándo debe esperar dejar de recibirlos, respectivamente, de modo que pueda realizar cualquier tarea de configuración y finalización específica de la aplicación. Le proporcionamos el código de los mismos. El procesamiento principal sucede en el métodoprocessRecords
, que a su vez utilizaprocessRecord
para cada registro. Este último método se proporciona principalmente como código estructural prácticamente vacío, para que pueda implementarlo en el siguiente paso, donde se explica más a fondo.También hay que resaltar la implementación de métodos de apoyo para
processRecord
:reportStats
yresetStats
, que están vacíos en el código fuente original.El método
processRecords
se implementa por usted, y realiza los pasos siguientes:-
Para cada registro que se pase, llama a
processRecord
. -
Si ha pasado al menos 1 minuto desde el último informe, llama a
reportStats()
, que imprime las últimas estadísticas y, a continuación, aresetStats()
, que elimina las estadísticas para que el próximo intervalo incluya solo registros nuevos. -
Establece el momento del siguiente informe.
-
Si ha transcurrido al menos un minuto desde el último punto de comprobación de la base de datos, llama a
checkpoint()
. -
Establece el momento del siguiente punto de comprobación.
Este método utiliza intervalos de 60 segundos para la velocidad de elaboración de informes y puntos de comprobación. Para obtener más información sobre los puntos de control, consulte Información adicional sobre el consumidor.
-
- StockStats clase
-
Esta clase proporciona retención de datos y seguimiento de estadísticas para las acciones más populares a lo largo del tiempo. Proporcionamos este código por usted y contiene los siguientes métodos:
-
addStockTrade(StockTrade)
: inserta elStockTrade
dado en las estadísticas de ejecución. -
toString()
: devuelve las estadísticas en una cadena con formato.
Esta clase realiza un seguimiento de las acciones más populares manteniendo un recuento continuo del número total de operaciones de cada acción y del recuento máximo. Asimismo, actualiza estos recuentos cada vez que llega una operación nueva.
-
Agregar código para los métodos de la clase StockTradeRecordProcessor
, tal y como se muestra en los pasos siguientes.
Para implementar el consumidor
-
Implemente el método
processRecord
creando una instancia de un objetoStockTrade
con el tamaño correcto y añadiéndole los datos de registro, registrando una advertencia si hay algún problema.StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array()); if (trade == null) { LOG.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.getPartitionKey()); return; } stockStats.addStockTrade(trade);
-
Implemente un método
reportStats
sencillo. Si lo desea, puede modificar el formato de salida según sus preferencias.System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" + stockStats + "\n" + "****************************************************************\n");
-
Por último, implemente el método
resetStats
, lo que creará una nueva instancia destockStats
.stockStats = new StockStats();
Para ejecutar el consumidor
-
Ejecute el productor que escribió en Implemente el productor para insertar registros de operaciones bursátiles simuladas en la secuencia.
-
Compruebe que la clave de acceso y la clave secreta recuperadas anteriormente (al crear el IAM usuario) estén guardadas en el archivo
~/.aws/credentials
. -
Ejecute la clase
StockTradesProcessor
con los siguientes argumentos:StockTradesProcessor StockTradeStream us-west-2
Tenga en cuenta que si ha creado su secuencia en una región diferente a
us-west-2
tiene que especificar esa región aquí.
Después de un minuto, debería ver un resultado similar a este, actualizado cada minuto:
****** Shard shardId-000000000001 stats for last 1 minute ******
Most popular stock being bought: WMT, 27 buys.
Most popular stock being sold: PTR, 14 sells.
****************************************************************
Información adicional sobre el consumidor
Si está familiarizado con las ventajas de Kinesis Client Library, que se tratan en Desarrolle KCL consumidores 1.x y en otros artículos, es posible que se pregunte por qué debería utilizarla aquí. Aunque solo utilice una secuencia de fragmentos y una sola instancia de consumidor para procesarla, es aún más fácil implementarla para el consumidor mediante. KCL Compare los pasos de implementación del código en la sección del productor con la del consumidor, y podrá ver en comparación la facilidad de implementar un consumidor. Esto se debe en gran parte a los servicios que KCL ofrece.
En esta aplicación, se centrará en la implementación de una clase de procesador de registros que puede procesar registros individuales. No tiene que preocuparse por la forma en que se obtienen los registros de Kinesis Data Streams; busca KCL los registros e invoca al procesador de registros siempre que haya nuevos registros disponibles. Tampoco tiene que preocuparse del número de fragmentos o instancias del consumidor que hay. Si la secuencia se amplía, no será necesario volver a escribir su aplicación para administrar más de un fragmento o una instancia del consumidor.
El término puntos de control significa registrar el punto de la transmisión hasta los registros de datos que se han consumido y procesado hasta el momento. Si la aplicación se bloquea, la transmisión se lee desde ese punto y no desde el principio de la transmisión. El tema de los puntos de comprobación, los diversos patrones de diseño y las prácticas recomendadas al respecto quedan fuera del ámbito de este capítulo. Sin embargo, es algo que puede encontrar en entornos de producción.
Como aprendió enImplemente el productor, las put
operaciones de Kinesis Data API Streams utilizan una clave de partición como entrada. Kinesis Data Streams utiliza una clave de partición como mecanismo para dividir los registros en varias particiones (cuando hay más de una partición en el flujo). La misma clave de partición siempre se dirige al mismo fragmento. Esto permite que el consumidor que procesa un determinado fragmento se diseñe con el supuesto de que los registros con la misma clave de partición solo se envían a ese consumidor, y que ningún registro con la misma clave de partición acaba en ningún otro consumidor. Por lo tanto, un proceso de trabajo del consumidor puede agregar todos los registros con la misma clave de partición sin preocuparse de que podrían faltar datos necesarios.
En esta aplicación, el procesamiento de registros por parte del consumidor no es intensivo, por lo que puede utilizar un fragmento y realizar el procesamiento en el mismo subproceso que el subproceso. KCL En la práctica, sin embargo, piense primero en aumentar el número de fragmentos. En algunos casos, es posible que desee cambiar el procesamiento a otro subproceso o utilizar un grupo de subprocesos si espera que el procesamiento de registros sea intenso. De esta forma, KCL pueden buscar nuevos registros más rápidamente, mientras que los otros subprocesos pueden procesar los registros en paralelo. El diseño multiproceso no es trivial y debe abordarse con técnicas avanzadas, por lo que aumentar el número de fragmentos suele ser la forma más eficaz de ampliarlo.
Siguientes pasos
(Opcional) Amplíe el número de consumidores