packages/core/lib/segment_emitter.js (99 lines of code) (raw):

var dgram = require('dgram'); var batcher = require('atomic-batcher'); var logger = require('./logger'); var PROTOCOL_HEADER = '{"format":"json","version":1}'; var PROTOCOL_DELIMITER = '\n'; /** * Sends a collection of data over a UDP socket. This method * is designed to be used by `atomic-batcher` as a way to share * a single UDP socket for sending multiple data blocks. * * @param {object} ops - Details of the data to send * @param {Function} callback - The function to call when done */ function batchSendData (ops, callback) { var client = dgram.createSocket('udp4'); executeSendData(client, ops, 0, function () { try { client.close(); } finally { callback(); } }); } /** * Execute sending data starting at the specified index and * using the provided client. * * @param {Socket} client - Socket to send data with * @param {object} ops - Details of data to send * @param {number} index - Starting index for sending * @param {Function} callback - Function to call when done */ function executeSendData (client, ops, index, callback) { if (index >= ops.length) { callback(); return; } sendMessage(client, ops[index], function () { executeSendData(client, ops, index+1, callback); }); } /** * Send a single message over a UDP socket. * * @param {Socket} client - Socket to send data with * @param {object} data - Details of the data to send * @param {Function} batchCallback - Function to call when done */ function sendMessage (client, data, batchCallback) { var msg = data.msg; var offset = data.offset; var length = data.length; var port = data.port; var address = data.address; var callback = data.callback; client.send(msg, offset, length, port, address, function(err) { try { callback(err); } finally { batchCallback(); } }); } /** * Class to mimic the Socket interface for a UDP client, but to provided * batching of outgoing sends using temporary Sockets that are created and * destroyed as needed. */ function BatchingTemporarySocket() { this.batchSend = batcher(batchSendData); } /** * Provide the same signature as the Socket.send method but the work will be * batched to share temporary UDP sockets whenever possible. */ BatchingTemporarySocket.prototype.send = function (msg, offset, length, port, address, callback) { var work = { msg: msg, offset: offset, length: length, port: port, address: address, callback: callback }; this.batchSend(work); }; /** * Segment emitter module. * @module SegmentEmitter */ var SegmentEmitter = { daemonConfig: require('./daemon_config'), /** * Returns the formatted segment JSON string. * @param {Segment} segment - The segment to format. */ format: function format(segment) { return PROTOCOL_HEADER + PROTOCOL_DELIMITER + segment.toString(); }, /** * Creates a UDP socket connection and send the formatted segment. * @param {Segment} segment - The segment to send to the daemon. */ send: function send(segment) { if (!this.socket) { if (this.useBatchingTemporarySocket) { this.socket = new BatchingTemporarySocket(); } else { this.socket = dgram.createSocket('udp4').unref(); } } var client = this.socket; var formatted = segment.format(); var data = PROTOCOL_HEADER + PROTOCOL_DELIMITER + formatted; var message = Buffer.from(data); var short = '{"trace_id:"' + segment.trace_id + '","id":"' + segment.id + '"}'; var type = segment.type === 'subsegment' ? 'Subsegment' : 'Segment'; client.send(message, 0, message.length, this.daemonConfig.udp_port, this.daemonConfig.udp_ip, function(err) { if (err) { if (err.code === 'EMSGSIZE') { logger.getLogger().error(type + ' too large to send: ' + short + ' (' + message.length + ' bytes).'); } else { logger.getLogger().error('Error occured sending segment: ', err); } } else { logger.getLogger().debug(type + ' sent: {"trace_id:"' + segment.trace_id + '","id":"' + segment.id + '"}'); logger.getLogger().debug('UDP message sent: ' + segment); } }); }, /** * Configures the address and/or port the daemon is expected to be on. * @param {string} address - Address of the daemon the segments should be sent to. Should be formatted as an IPv4 address. * @module SegmentEmitter * @function setDaemonAddress */ setDaemonAddress: function setDaemonAddress(address) { this.daemonConfig.setDaemonAddress(address); }, /** * Get the UDP IP the emitter is configured to. * @module SegmentEmitter * @function getIp */ getIp: function getIp() { return this.daemonConfig.udp_ip; }, /** * Get the UDP port the emitter is configured to. * @module SegmentEmitter * @function getPort */ getPort: function getPort() { return this.daemonConfig.udp_port; }, /** * Forces the segment emitter to create a new socket on send, and close it on complete. * @module SegmentEmitter * @function disableReusableSocket */ disableReusableSocket: function() { this.useBatchingTemporarySocket = true; } }; module.exports = SegmentEmitter;