Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Implementar el productor
La aplicación del Tutorial: Procesar operaciones bursátiles en tiempo real con KPL y KCL 1.x utiliza un escenario real de monitorización del mercado bursátil. Los siguientes principios explicar brevemente la forma en que este escenario se asocia con la estructura del productor y el código de apoyo.
Consulte el código fuente y revise la siguiente información.
- Clase StockTrade
-
Una operación bursátil está representada por una instancia de la clase
StockTrade
. Esta instancia contiene atributos como el símbolo de cotización, el precio, el número de acciones, el tipo de transacción (compra o venta) y un ID que identifica de forma exclusiva la transacción. Esta clase se implementa por usted. - Registro de la secuencia
-
Una secuencia es una serie de registros. Un registro es la sucesión en serie de una instancia de
StockTrade
en formato JSON. Por ejemplo:{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
- Clase StockTradeGenerator
-
StockTradeGenerator
tiene un método denominadogetRandomTrade()
que proporciona una nueva operación bursátil generada aleatoriamente cada vez que se invoca. Esta clase se implementa por usted. - Clase StockTradesWriter
-
El método
main
del productor,StockTradesWriter
recupera continuamente una operación aleatoria y luego la envía a Kinesis Data Streams mediante la ejecución de las siguientes tareas:-
Lee los nombres de la secuencia y la región como entrada.
-
Crea un
AmazonKinesisClientBuilder
. -
Utiliza el compilador de clientes para establecer la región, las credenciales y la configuración de cliente.
-
Crea un cliente de
AmazonKinesis
mediante el compilador de clientes. -
Comprueba que la secuencia existe y está activa (si no, sale con un error).
-
En un bucle continuo, llama al método
StockTradeGenerator.getRandomTrade()
y después al métodosendStockTrade
para enviar la transacción a la secuencia cada 100 milisegundos.
El método
sendStockTrade
de la claseStockTradesWriter
tiene el siguiente código:private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest putRecord = new PutRecordRequest(); putRecord.setStreamName(streamName); // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes)); try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); } }
Consulte el siguiente desglose del código:
-
La API de
PutRecord
espera una matriz de bytes, y debe convertir eltrade
a formato JSON. Esta única línea de código realiza esa operación:byte[] bytes = trade.toJsonAsBytes();
-
Antes de poder enviar la transacción, debe crear una nueva instancia de
PutRecordRequest
(denominadaputRecord
en este caso):PutRecordRequest putRecord = new PutRecordRequest();
Cada llamada a
PutRecord
requiere el nombre de la secuencia, la clave de partición y el blob de datos. El siguiente código rellenará estos campos en el objetoputRecord
utilizando sus métodossetXxxx()
:putRecord.setStreamName(streamName); putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes));
El ejemplo utiliza un ticker como clave de partición, que asigna el registro a una partición específica. En la práctica, debería tener cientos o miles de claves de partición por fragmento, de forma que los registros se dispersen de forma uniforme en toda la secuencia. Para obtener más información acerca de cómo agregar datos a una secuencia, consulte Agregar datos a un flujo.
Ahora
putRecord
estará listo para el envío al cliente (la operaciónput
):kinesisClient.putRecord(putRecord);
-
Siempre es útil agregar funciones de comprobación y registro de errores. Este código registra las condiciones de error:
if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }
Agregue el bloque try/catch a la operación
put
:try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); }
Esto se debe a que una operación
put
de Kinesis Data Streams puede fallar debido a un error de red o debido a que el flujo alcanza sus límites de rendimiento y se ve limitado. Le recomendamos comprobar detalladamente su política de reintentos para las operaciones deput
de modo que evite pérdida de datos, por ejemplo, al utilizar un reintento. -
El registro de estado resulta útil, pero es opcional:
LOG.info("Putting trade: " + trade.toString());
El productor que se muestra aquí utiliza la funcionalidad de registro único de la API de Kinesis Data Streams,
PutRecord
. En la práctica, si un solo productor genera una gran cantidad de registros, suele ser más eficaz utilizar la funcionalidad de varios registros dePutRecords
y enviar lotes de registros de una vez. Para obtener más información, consulte Agregar datos a un flujo. -
Para ejecutar el productor
-
Compruebe que la clave de acceso y el par de claves secretas recuperadas anteriormente (al crear el usuario de IAM) se guardaron en el archivo
~/.aws/credentials
. -
Ejecute la clase
StockTradeWriter
con los siguientes argumentos:StockTradeStream us-west-2
Si ha creado su secuencia en una región diferente a
us-west-2
tendrá que especificar esa región aquí.
Debería ver una salida similar a esta:
Feb 16, 2015 3:53:00 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
El flujo de operaciones bursátiles está siendo adquirido por Kinesis Data Streams.