provider/lib/utils.js (444 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ var request = require('request'); var HttpStatus = require('http-status-codes'); var constants = require('./constants.js'); var authHandler = require('./authHandler'); module.exports = function(logger, triggerDB, redisClient) { var retryAttempts = constants.RETRY_ATTEMPTS; var filterDDName = constants.FILTERS_DESIGN_DOC; var viewDDName = constants.VIEWS_DESIGN_DOC; var triggersByWorker = constants.TRIGGERS_BY_WORKER; var redisKeyPrefix = process.env.REDIS_KEY_PREFIX || triggerDB.config.db; var self = this; this.triggers = {}; this.endpointAuth = process.env.ENDPOINT_AUTH; this.routerHost = process.env.ROUTER_HOST || 'localhost'; this.worker = process.env.WORKER || 'worker0'; this.host = process.env.HOST_INDEX || 'host0'; this.hostPrefix = this.host.replace(/\d+$/, ''); this.activeHost = `${this.hostPrefix}0`; //default value on init (will be updated for existing redis) this.db = triggerDB; this.redisClient = redisClient; this.redisKey = redisKeyPrefix + '_' + this.worker; this.redisField = constants.REDIS_FIELD; this.uriHost ='https://' + this.routerHost; this.monitorStatus = {}; // Add a trigger: listen for changes and dispatch. this.createTrigger = function(triggerData) { var method = 'createTrigger'; var Cloudant = require('@cloudant/cloudant'); var cloudantConnection; if (triggerData.iamApiKey) { var dbURL = `${triggerData.protocol}://${triggerData.host}`; if (triggerData.port) { dbURL += ':' + triggerData.port; } cloudantConnection = new Cloudant({ url: dbURL, plugins: { iamauth: { iamApiKey: triggerData.iamApiKey, iamTokenUrl: triggerData.iamUrl } } }); } else { var url = `${triggerData.protocol}://${triggerData.user}:${triggerData.pass}@${triggerData.host}`; if (triggerData.port) { url += ':' + triggerData.port; } cloudantConnection = Cloudant(url); } try { var triggeredDB = cloudantConnection.use(triggerData.dbname); // Listen for changes on this database. var feed = triggeredDB.follow({since: triggerData.since, include_docs: false}); if (triggerData.filter) { feed.filter = triggerData.filter; } if (triggerData.query_params) { feed.query_params = triggerData.query_params; } triggerData.feed = feed; self.triggers[triggerData.id] = triggerData; feed.on('change', function (change) { var triggerHandle = self.triggers[triggerData.id]; if (triggerHandle && shouldFireTrigger(triggerHandle) && hasTriggersRemaining(triggerHandle)) { logger.info(method, 'Trigger', triggerData.id, 'got change from', triggerData.dbname); try { fireTrigger(triggerData.id, change); } catch (e) { logger.error(method, 'Exception occurred while firing trigger', triggerData.id, e); } } }); feed.follow(); return new Promise(function(resolve, reject) { feed.on('error', function (err) { logger.error(method,'Error occurred for trigger', triggerData.id, '(db ' + triggerData.dbname + '):', err); reject(err); }); feed.on('confirm', function () { logger.info(method, 'Added plugin provider data trigger', triggerData.id, 'listening for changes in database', triggerData.dbname); if (isMonitoringTrigger(triggerData.monitor, triggerData.id)) { self.monitorStatus.triggerStarted = "success"; } resolve(triggerData.id); }); }); } catch (err) { logger.info(method, 'caught an exception for trigger', triggerData.id, err); return Promise.reject(err); } }; function initTrigger(newTrigger) { var maxTriggers = newTrigger.maxTriggers || constants.DEFAULT_MAX_TRIGGERS; var trigger = { id: newTrigger.id, host: newTrigger.host, port: newTrigger.port, protocol: newTrigger.protocol || 'https', dbname: newTrigger.dbname, user: newTrigger.user, pass: newTrigger.pass, apikey: newTrigger.apikey, since: newTrigger.since || 'now', maxTriggers: maxTriggers, triggersLeft: maxTriggers, filter: newTrigger.filter, query_params: newTrigger.query_params, additionalData: newTrigger.additionalData, iamApiKey: newTrigger.iamApiKey, iamUrl: newTrigger.iamUrl }; return trigger; } function shouldDisableTrigger(statusCode) { return ((statusCode >= 400 && statusCode < 500) && [HttpStatus.REQUEST_TIMEOUT, HttpStatus.TOO_MANY_REQUESTS, HttpStatus.CONFLICT].indexOf(statusCode) === -1); } function shouldFireTrigger(trigger) { return trigger.monitor || self.activeHost === self.host; } function hasTriggersRemaining(trigger) { return !trigger.maxTriggers || trigger.maxTriggers === -1 || trigger.triggersLeft > 0; } function isMonitoringTrigger(monitor, triggerIdentifier) { return monitor && self.monitorStatus.triggerName === parseQName(triggerIdentifier).name; } function disableTrigger(id, statusCode, message) { var method = 'disableTrigger'; triggerDB.get(id, function (err, existing) { if (!err) { if (!existing.status || existing.status.active === true) { var updatedTrigger = existing; var status = { 'active': false, 'dateChanged': Date.now(), 'reason': {'kind': 'AUTO', 'statusCode': statusCode, 'message': message} }; updatedTrigger.status = status; triggerDB.insert(updatedTrigger, id, function (err) { if (err) { logger.error(method, 'there was an error while disabling', id, 'in database. ' + err); } else { logger.info(method, 'trigger', id, 'successfully disabled in database'); } }); } } else { logger.info(method, 'could not find', id, 'in database'); //make sure it is removed from memory as well deleteTrigger(id); } }); } // Delete a trigger: stop listening for changes and remove it. function deleteTrigger(triggerIdentifier, monitorTrigger) { var method = 'deleteTrigger'; if (self.triggers[triggerIdentifier]) { if (self.triggers[triggerIdentifier].feed) { self.triggers[triggerIdentifier].feed.stop(); } delete self.triggers[triggerIdentifier]; logger.info(method, 'trigger', triggerIdentifier, 'successfully deleted from memory'); if (isMonitoringTrigger(monitorTrigger, triggerIdentifier)) { self.monitorStatus.triggerStopped = "success"; } } } function fireTrigger(triggerIdentifier, change) { var method = 'fireTrigger'; var triggerData = self.triggers[triggerIdentifier]; var triggerObj = parseQName(triggerData.id); var form = change; form.dbname = triggerData.dbname; logger.info(method, 'firing trigger', triggerData.id, 'with db update'); var host = 'https://' + self.routerHost; var uri = host + '/api/v1/namespaces/' + triggerObj.namespace + '/triggers/' + triggerObj.name; postTrigger(triggerData, form, uri, 0) .then(triggerId => { logger.info(method, 'Trigger', triggerId, 'was successfully fired'); if (isMonitoringTrigger(triggerData.monitor, triggerId)) { self.monitorStatus.triggerFired = "success"; } if (triggerData.triggersLeft === 0) { if (triggerData.monitor) { deleteTrigger(triggerId, triggerData.monitor); } else { disableTrigger(triggerId, undefined, 'Automatically disabled after reaching max triggers'); logger.warn(method, 'no more triggers left, disabled', triggerId); } } }) .catch(err => { logger.error(method, err); }); } function postTrigger(triggerData, form, uri, retryCount) { var method = 'postTrigger'; return new Promise(function(resolve, reject) { // only manage trigger fires if they are not infinite if (triggerData.maxTriggers !== -1) { triggerData.triggersLeft--; } self.authRequest(triggerData, { method: 'post', uri: uri, json: form }, function(error, response) { try { var statusCode; if (!error) { statusCode = response.statusCode; } else if (error.statusCode) { statusCode = error.statusCode; } logger.info(method, triggerData.id, 'http post request, STATUS:', statusCode); if (error || statusCode >= 400) { // only manage trigger fires if they are not infinite if (triggerData.maxTriggers !== -1) { triggerData.triggersLeft++; } logger.error(method, 'there was an error invoking', triggerData.id, statusCode || error); if (!error && shouldDisableTrigger(statusCode)) { //disable trigger var message = 'Automatically disabled after receiving a ' + statusCode + ' status code when firing the trigger'; disableTrigger(triggerData.id, statusCode, message); reject('Disabled trigger ' + triggerData.id + ' due to status code: ' + statusCode); } else { if (retryCount < retryAttempts ) { var timeout = statusCode === 429 && retryCount === 0 ? 60000 : 1000 * Math.pow(retryCount + 1, 2); logger.info(method, 'attempting to fire trigger again', triggerData.id, 'Retry Count:', (retryCount + 1)); setTimeout(function () { postTrigger(triggerData, form, uri, (retryCount + 1)) .then(triggerId => { resolve(triggerId); }) .catch(err => { reject(err); }); }, timeout); } else { reject('Unable to reach server to fire trigger ' + triggerData.id); } } } else { logger.info(method, 'fired', triggerData.id, triggerData.triggersLeft, 'triggers left'); resolve(triggerData.id); } } catch(err) { reject('Exception occurred while firing trigger ' + err); } }); }); } this.initAllTriggers = function() { var method = 'initAllTriggers'; //follow the trigger DB setupFollow('now'); logger.info(method, 'resetting system from last state'); triggerDB.view(viewDDName, triggersByWorker, {reduce: false, include_docs: true, key: self.worker}, function(err, body) { if (!err) { body.rows.forEach(function (trigger) { var triggerIdentifier = trigger.id; var doc = trigger.doc; if (!(triggerIdentifier in self.triggers)) { //check if trigger still exists in whisk db var triggerObj = parseQName(triggerIdentifier); var host = 'https://' + self.routerHost + ':' + 443; var triggerURL = host + '/api/v1/namespaces/' + triggerObj.namespace + '/triggers/' + triggerObj.name; logger.info(method, 'Checking if trigger', triggerIdentifier, 'still exists'); self.authRequest(doc, { method: 'get', url: triggerURL }, function (error, response) { //disable trigger in database if trigger is dead if (!error && shouldDisableTrigger(response.statusCode)) { var message = 'Automatically disabled after receiving a ' + response.statusCode + ' status code on init trigger'; disableTrigger(triggerIdentifier, response.statusCode, message); logger.error(method, 'trigger', triggerIdentifier, 'has been disabled due to status code:', response.statusCode); } else { self.createTrigger(initTrigger(doc)) .then(triggerIdentifier => { logger.info(method, triggerIdentifier, 'created successfully'); }) .catch(err => { var message = 'Automatically disabled after receiving exception on init trigger: ' + err; disableTrigger(triggerIdentifier, undefined, message); logger.error(method, 'Disabled trigger', triggerIdentifier, 'due to exception:', err); }); } }); } }); } else { logger.error(method, 'could not get latest state from database', err); } }); }; function setupFollow(seq) { var method = 'setupFollow'; try { var feed = triggerDB.follow({ since: seq, include_docs: true, filter: filterDDName + '/' + triggersByWorker, query_params: {worker: self.worker} }); feed.on('change', (change) => { var triggerIdentifier = change.id; var doc = change.doc; if (self.triggers[triggerIdentifier]) { if (doc.status && doc.status.active === false) { deleteTrigger(triggerIdentifier); } } else { //ignore changes to disabled triggers if (!doc.status || doc.status.active === true) { self.createTrigger(initTrigger(doc)) .then(triggerIdentifier => { logger.info(method, triggerIdentifier, 'created successfully'); }) .catch(err => { var message = 'Automatically disabled after receiving exception on create trigger: ' + err; disableTrigger(triggerIdentifier, undefined, message); logger.error(method, 'Disabled trigger', triggerIdentifier, 'due to exception:', err); }); } } }); feed.on('error', function (err) { logger.error(method, err); }); feed.follow(); } catch (err) { logger.error(method, err); } } this.authorize = function(req, res, next) { var method = 'authorize'; if (self.endpointAuth) { if (!req.headers.authorization) { res.set('www-authenticate', 'Basic realm="Private"'); res.status(HttpStatus.UNAUTHORIZED); return res.send(''); } var parts = req.headers.authorization.split(' '); if (parts[0].toLowerCase() !== 'basic' || !parts[1]) { return sendError(method, HttpStatus.BAD_REQUEST, 'Malformed request, basic authentication expected', res); } var auth = new Buffer(parts[1], 'base64').toString(); auth = auth.match(/^([^:]*):(.*)$/); if (!auth) { return sendError(method, HttpStatus.BAD_REQUEST, 'Malformed request, authentication invalid', res); } var uuid = auth[1]; var key = auth[2]; var endpointAuth = self.endpointAuth.split(':'); if (endpointAuth[0] === uuid && endpointAuth[1] === key) { next(); } else { logger.warn(method, 'Invalid key'); return sendError(method, HttpStatus.UNAUTHORIZED, 'Invalid key', res); } } else { next(); } }; function sendError(method, code, message, res) { logger.error(method, message); res.status(code).json({error: message}); } function parseQName(qname, separator) { var parsed = {}; var delimiter = separator || ':'; var defaultNamespace = '_'; if (qname && qname.charAt(0) === delimiter) { var parts = qname.split(delimiter); parsed.namespace = parts[1]; parsed.name = parts.length > 2 ? parts.slice(2).join(delimiter) : ''; } else { parsed.namespace = defaultNamespace; parsed.name = qname; } return parsed; } this.initRedis = function() { var method = 'initRedis'; return new Promise(function(resolve, reject) { if (redisClient) { var subscriber = redisClient.duplicate(); //create a subscriber client that listens for requests to perform swap subscriber.on('message', function (channel, message) { logger.info(method, message, 'set to active host in channel', channel); self.activeHost = message; }); subscriber.on('error', function (err) { logger.error(method, 'Error connecting to redis', err); reject(err); }); subscriber.subscribe(self.redisKey); redisClient.hgetAsync(self.redisKey, self.redisField) .then(activeHost => { return initActiveHost(activeHost); }) .then(() => { process.on('SIGTERM', function onSigterm() { if (self.activeHost === self.host) { var redundantHost = self.host === `${self.hostPrefix}0` ? `${self.hostPrefix}1` : `${self.hostPrefix}0`; self.redisClient.hsetAsync(self.redisKey, self.redisField, redundantHost) .then(() => { self.redisClient.publish(self.redisKey, redundantHost); }) .catch(err => { logger.error(method, err); }); } }); resolve(); }) .catch(err => { reject(err); }); } else { resolve(); } }); }; function initActiveHost(activeHost) { var method = 'initActiveHost'; if (activeHost === null) { //initialize redis key with active host logger.info(method, 'redis hset', self.redisKey, self.redisField, self.activeHost); return redisClient.hsetAsync(self.redisKey, self.redisField, self.activeHost); } else { self.activeHost = activeHost; return Promise.resolve(); } } this.authRequest = function(triggerData, options, cb) { var method = 'authRequest'; authHandler.handleAuth(triggerData) .then(auth => { options.auth = auth; request(options, cb); }) .catch(err => { logger.error(method, err); cb(err); }); }; };