modules/backend/app/agentsHandler.js (249 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ 'use strict'; const uuid = require('uuid/v4'); const fs = require('fs'); const path = require('path'); const JSZip = require('jszip'); const socketio = require('socket.io'); const _ = require('lodash'); // Fire me up! /** * Module interaction with agents. */ module.exports = { implements: 'agents-handler', inject: ['settings', 'mongo', 'agent-socket'] }; /** * @param settings * @param mongo * @param {AgentSocket} AgentSocket * @returns {AgentsHandler} */ module.exports.factory = function(settings, mongo, AgentSocket) { class AgentSockets { constructor() { /** * @type {Map.<String, Array.<String>>} */ this.sockets = new Map(); } get(account) { let sockets = this.sockets.get(account._id.toString()); if (_.isEmpty(sockets)) this.sockets.set(account._id.toString(), sockets = []); return sockets; } /** * @param {AgentSocket} sock * @param {String} account * @return {Array.<AgentSocket>} */ add(account, sock) { const sockets = this.get(account); sockets.push(sock); } /** * @param {Socket} browserSocket * @return {AgentSocket} */ find(browserSocket) { const {_id} = browserSocket.request.user; const sockets = this.sockets.get(_id); return _.find(sockets, (sock) => _.includes(sock.demo.browserSockets, browserSocket)); } } class Cluster { constructor(top) { const clusterName = top.clusterName; this.id = _.isEmpty(top.clusterId) ? uuid() : top.clusterId; this.name = _.isEmpty(clusterName) ? `Cluster ${this.id.substring(0, 8).toUpperCase()}` : clusterName; this.nids = top.nids; this.addresses = top.addresses; this.clients = top.clients; this.clusterVersion = top.clusterVersion; this.active = top.active; this.secured = top.secured; } isSameCluster(top) { return _.intersection(this.nids, top.nids).length > 0; } update(top) { this.clusterVersion = top.clusterVersion; this.nids = top.nids; this.addresses = top.addresses; this.clients = top.clients; this.clusterVersion = top.clusterVersion; this.active = top.active; this.secured = top.secured; } same(top) { return _.difference(this.nids, top.nids).length === 0 && _.isEqual(this.addresses, top.addresses) && this.clusterVersion === top.clusterVersion && this.active === top.active; } } /** * Connected agents manager. */ class AgentsHandler { /** * @constructor */ constructor() { /** * Connected agents. * @type {AgentSockets} */ this._agentSockets = new AgentSockets(); this.clusters = []; this.topLsnrs = []; } /** * Collect supported agents list. * @private */ _collectSupportedAgents() { const jarFilter = (file) => path.extname(file) === '.jar'; const agentArchives = fs.readdirSync(settings.agent.dists) .filter((file) => path.extname(file) === '.zip'); const agentsPromises = _.map(agentArchives, (fileName) => { const filePath = path.join(settings.agent.dists, fileName); return JSZip.loadAsync(fs.readFileSync(filePath)) .then((zip) => { const jarPath = _.find(_.keys(zip.files), jarFilter); return JSZip.loadAsync(zip.files[jarPath].async('nodebuffer')) .then((jar) => jar.files['META-INF/MANIFEST.MF'].async('string')) .then((lines) => _.reduce(lines.split(/\s*\n+\s*/), (acc, line) => { if (!_.isEmpty(line)) { const arr = line.split(/\s*:\s*/); acc[arr[0]] = arr[1]; } return acc; }, {})) .then((manifest) => { const ver = manifest['Implementation-Version']; const buildTime = manifest['Build-Time']; if (ver && buildTime) return { fileName, filePath, ver, buildTime }; }); }); }); return Promise.all(agentsPromises) .then((descs) => { const agentDescs = _.keyBy(_.remove(descs, null), 'ver'); const latestVer = _.head(Object.keys(agentDescs).sort((a, b) => { const aParts = a.split('.'); const bParts = b.split('.'); for (let i = 0; i < aParts.length; ++i) { if (aParts[i] !== bParts[i]) return aParts[i] < bParts[i] ? 1 : -1; } if (aParts.length === bParts.length) return 0; return aParts.length < bParts.length ? 1 : -1; })); // Latest version of agent distribution. if (latestVer) agentDescs.current = agentDescs[latestVer]; return agentDescs; }); } getOrCreateCluster(top) { let cluster = _.find(this.clusters, (c) => c.isSameCluster(top)); if (_.isNil(cluster)) { cluster = new Cluster(top); this.clusters.push(cluster); } return cluster; } /** * Add topology listener. * * @param lsnr */ addTopologyListener(lsnr) { this.topLsnrs.push(lsnr); } /** * Link agent with browsers by account. * * @param {Socket} sock * @param {Array.<mongo.Account>} accounts * @param {Array.<String>} tokens * @param {boolean} demoEnabled * * @private */ onConnect(sock, accounts, tokens, demoEnabled) { const agentSocket = new AgentSocket(sock, accounts, tokens, demoEnabled); _.forEach(accounts, (account) => { this._agentSockets.add(account, agentSocket); this._browsersHnd.agentStats(account); }); sock.on('disconnect', () => { _.forEach(accounts, (account) => { _.pull(this._agentSockets.get(account), agentSocket); this._browsersHnd.agentStats(account); }); }); sock.on('cluster:topology', (top) => { if (_.isNil(top)) { console.log('Invalid format of message: "cluster:topology"'); return; } const cluster = this.getOrCreateCluster(top); _.forEach(this.topLsnrs, (lsnr) => lsnr(agentSocket, cluster, top)); if (agentSocket.cluster !== cluster) { agentSocket.cluster = cluster; _.forEach(accounts, (account) => { this._browsersHnd.agentStats(account); }); } else { const changed = !cluster.same(top); if (changed) { cluster.update(top); _.forEach(accounts, (account) => { this._browsersHnd.clusterChanged(account, cluster); }); } } }); sock.on('cluster:disconnected', () => { const newTop = _.assign({}, agentSocket.cluster, {nids: []}); _.forEach(this.topLsnrs, (lsnr) => lsnr(agentSocket, agentSocket.cluster, newTop)); agentSocket.cluster = null; _.forEach(accounts, (account) => { this._browsersHnd.agentStats(account); }); }); return agentSocket; } getAccounts(tokens) { return mongo.Account.find({token: {$in: tokens}}, '_id token').lean().exec() .then((accounts) => ({accounts, activeTokens: _.uniq(_.map(accounts, 'token'))})); } /** * @param {http.Server|https.Server} srv Server instance that we want to attach agent handler. * @param {BrowsersHandler} browsersHnd */ attach(srv, browsersHnd) { this._browsersHnd = browsersHnd; this._collectSupportedAgents() .then((supportedAgents) => { this.currentAgent = _.get(supportedAgents, 'current'); if (this.io) throw 'Agent server already started!'; this.io = socketio(srv, {path: '/agents'}); this.io.on('connection', (sock) => { const sockId = sock.id; console.log('Connected agent with socketId: ', sockId); sock.on('disconnect', (reason) => { console.log(`Agent disconnected with [socketId=${sockId}, reason=${reason}]`); }); sock.on('agent:auth', ({ver, bt, tokens, disableDemo} = {}, cb) => { console.log(`Received authentication request [socketId=${sockId}, tokens=${tokens}, ver=${ver}].`); if (_.isEmpty(tokens)) return cb('Tokens not set. Please reload agent archive or check settings'); if (ver && bt && !_.isEmpty(supportedAgents)) { const btDistr = _.get(supportedAgents, [ver, 'buildTime']); if (_.isEmpty(btDistr) || btDistr !== bt) return cb('You are using an older version of the agent. Please reload agent'); } return this.getAccounts(tokens) .then(({accounts, activeTokens}) => { if (_.isEmpty(activeTokens)) return cb(`Failed to authenticate with token(s): ${tokens.join(',')}. Please reload agent archive or check settings`); cb(null, activeTokens); return this.onConnect(sock, accounts, activeTokens, !disableDemo); }) // TODO IGNITE-1379 send error to web master. .catch(() => cb(`Invalid token(s): ${tokens.join(',')}. Please reload agent archive or check settings`)); }); }); }) .catch(() => { console.log('Failed to collect supported agents'); }); } agent(account, demo, clusterId) { if (!this.io) return Promise.reject(new Error('Agent server not started yet!')); const socks = this._agentSockets.get(account); if (_.isEmpty(socks)) return Promise.reject(new Error('Failed to find connected agent for this account')); if (demo) { const sock = _.find(socks, (sock) => sock.demo.enabled); if (sock) return Promise.resolve(sock); return Promise.reject(new Error('Demo mode disabled by administrator')); } if (_.isNil(clusterId)) return Promise.resolve(_.head(socks)); const sock = _.find(socks, (agentSock) => _.get(agentSock, 'cluster.id') === clusterId); if (_.isEmpty(sock)) return Promise.reject(new Error('Failed to find agent connected to cluster')); return Promise.resolve(sock); } agents(account) { if (!this.io) return Promise.reject(new Error('Agent server not started yet!')); const socks = this._agentSockets.get(account); if (_.isEmpty(socks)) return Promise.reject(new Error('Failed to find connected agent for this token')); return Promise.resolve(socks); } /** * Try stop agent for token if not used by others. * * @param {mongo.Account} account */ onTokenReset(account) { if (_.isNil(this.io)) return; const agentSockets = this._agentSockets.get(account); _.forEach(agentSockets, (sock) => sock.resetToken(account.token)); } } return new AgentsHandler(); };