var dgram = require('dgram');
var batcher = require('atomic-batcher');
var logger = require('./logger');
var PROTOCOL_HEADER = '{"format":"json","version":1}';
var PROTOCOL_DELIMITER = '\n';
/**
* Sends a collection of data over a UDP socket. This method
* is designed to be used by `atomic-batcher` as a way to share
* a single UDP socket for sending multiple data blocks.
*
* @param {object} ops - Details of the data to send
* @param {Function} callback - The function to call when done
*/
function batchSendData (ops, callback) {
var client = dgram.createSocket('udp4');
executeSendData(client, ops, 0, function () {
try {
client.close();
} finally {
callback();
}
});
}
/**
* Execute sending data starting at the specified index and
* using the provided client.
*
* @param {Socket} client - Socket to send data with
* @param {object} ops - Details of data to send
* @param {number} index - Starting index for sending
* @param {Function} callback - Function to call when done
*/
function executeSendData (client, ops, index, callback) {
if (index >= ops.length) {
callback();
return;
}
sendMessage(client, ops[index], function () {
executeSendData(client, ops, index+1, callback);
});
}
/**
* Send a single message over a UDP socket.
*
* @param {Socket} client - Socket to send data with
* @param {object} data - Details of the data to send
* @param {Function} batchCallback - Function to call when done
*/
function sendMessage (client, data, batchCallback) {
var msg = data.msg;
var offset = data.offset;
var length = data.length;
var port = data.port;
var address = data.address;
var callback = data.callback;
client.send(msg, offset, length, port, address, function(err) {
try {
callback(err);
} finally {
batchCallback();
}
});
}
/**
* Class to mimic the Socket interface for a UDP client, but to provided
* batching of outgoing sends using temporary Sockets that are created and
* destroyed as needed.
*/
function BatchingTemporarySocket() {
this.batchSend = batcher(batchSendData);
}
/**
* Provide the same signature as the Socket.send method but the work will be
* batched to share temporary UDP sockets whenever possible.
*/
BatchingTemporarySocket.prototype.send = function (msg, offset, length, port, address, callback) {
var work = {
msg: msg,
offset: offset,
length: length,
port: port,
address: address,
callback: callback
};
this.batchSend(work);
};
/**
* Segment emitter module.
* @module SegmentEmitter
*/
var SegmentEmitter = {
daemonConfig: require('./daemon_config'),
/**
* Returns the formatted segment JSON string.
* @param {Segment} segment - The segment to format.
*/
format: function format(segment) {
return PROTOCOL_HEADER + PROTOCOL_DELIMITER + segment.toString();
},
/**
* Creates a UDP socket connection and send the formatted segment.
* @param {Segment} segment - The segment to send to the daemon.
*/
send: function send(segment) {
if (!this.socket) {
if (this.useBatchingTemporarySocket) {
this.socket = new BatchingTemporarySocket();
} else {
this.socket = dgram.createSocket('udp4').unref();
}
}
var client = this.socket;
var formatted = segment.format();
var data = PROTOCOL_HEADER + PROTOCOL_DELIMITER + formatted;
var message = Buffer.from(data);
var short = '{"trace_id:"' + segment.trace_id + '","id":"' + segment.id + '"}';
var type = segment.type === 'subsegment' ? 'Subsegment' : 'Segment';
client.send(message, 0, message.length, this.daemonConfig.udp_port, this.daemonConfig.udp_ip, function(err) {
if (err) {
if (err.code === 'EMSGSIZE') {
logger.getLogger().error(type + ' too large to send: ' + short + ' (' + message.length + ' bytes).');
} else {
logger.getLogger().error('Error occured sending segment: ', err);
}
} else {
logger.getLogger().debug(type + ' sent: {"trace_id:"' + segment.trace_id + '","id":"' + segment.id + '"}');
logger.getLogger().debug('UDP message sent: ' + segment);
}
});
},
/**
* Configures the address and/or port the daemon is expected to be on.
* @param {string} address - Address of the daemon the segments should be sent to. Should be formatted as an IPv4 address.
* @module SegmentEmitter
* @function setDaemonAddress
*/
setDaemonAddress: function setDaemonAddress(address) {
this.daemonConfig.setDaemonAddress(address);
},
/**
* Get the UDP IP the emitter is configured to.
* @module SegmentEmitter
* @function getIp
*/
getIp: function getIp() {
return this.daemonConfig.udp_ip;
},
/**
* Get the UDP port the emitter is configured to.
* @module SegmentEmitter
* @function getPort
*/
getPort: function getPort() {
return this.daemonConfig.udp_port;
},
/**
* Forces the segment emitter to create a new socket on send, and close it on complete.
* @module SegmentEmitter
* @function disableReusableSocket
*/
disableReusableSocket: function() {
this.useBatchingTemporarySocket = true;
}
};
module.exports = SegmentEmitter;