index.js (284 lines of code) (raw):
module.exports = Sevnup;
var _ = require('lodash');
var farmhash = require('farmhash');
var async = require('async');
var CacheStore = require('./cache_store');
var DEFAULT_TOTAL_VNODES = 1024;
var DEFAULT_CALM_THRESHOLD = 500;
var DEFAULT_RETRY_INTERVAL_MS = 5000;
// We're setting a modest limit to make sure we're not starving the event-loop of CPU
// with overly aggressive setting
var DEFAULT_MAX_PARALLEL_TASKS = 2;
/**
* Params are:
* hashRing
* hashRingLookup (optional)
* store
* recoverKey
* releaseKey
* totalVNodes
* addOnLookup
* logger
* watchMode
*/
function Sevnup(params) {
this.hashRing = params.hashRing;
this.hashRingLookup = params.hashRingLookup || this.hashRing.lookup.bind(this.hashRing);
this.store = new CacheStore(params.store);
this.recoverKeyCallback = params.recoverKey;
this.releaseKeyCallback = params.releaseKey;
this.totalVNodes = params.totalVNodes || DEFAULT_TOTAL_VNODES;
this.logger = params.logger;
this.statsd = params.statsd;
this.calmThreshold = params.calmThreshold || DEFAULT_CALM_THRESHOLD;
this.calmTimeout = null;
this.watchMode = params.watchMode;
this.running = true;
this.retryIntervalMs = params.retryIntervalMs || DEFAULT_RETRY_INTERVAL_MS;
this.maxConcurrencyLevel = params.maxConcurrencyLevel || DEFAULT_MAX_PARALLEL_TASKS;
this.retryRecoverOnFailure = params.retryRecoverOnFailure || false;
this.ownedVNodes = [];
this.stateChangeQueue = async.queue(this._handleRingStateChange.bind(this), 1);
// Separate the two task types such that a bad key wont prevent a key from being recovered
this.keyRetryQueue = async.queue(this._retryInQueue.bind(this), 1);
this.loadKeyRetryQueue = async.queue(this._retryInQueue.bind(this), 1);
this.eventHandler = this._onRingStateChange.bind(this);
this._attachToRing(params.addOnLookup);
}
/**
* Adds a key to sevnup if it belongs to the current worker.
* @param {string} key The key you want to add.
* @param {function} done Optional callback if you want to listen to completion
*/
Sevnup.prototype.addKey = function addKey(key, done) {
var vnode = this.getVNodeForKey(key);
return this.addKeyToVNode(key, vnode, done);
};
Sevnup.prototype.addKeyToVNode = function addKeyToVNode(key, vnode, done) {
var self = this;
var node = this.hashRingLookup(vnode);
if (this.hashRing.whoami() === node) {
this.store.addKey(vnode, key, function(err) {
if (err) {
self.logger.error("Sevnup.sevnupLookup failed to persist key", {
vnode: vnode,
key: key,
error: err
});
}
if (done) {
done(err);
}
});
} else if (done) {
done();
}
};
/**
* When you are done working on a key, or no longer want it within bookkeeping
* you can alert sevnup to forget it. This notifies the ring that it doesn't
* need attention in the event this node goes down or hands off ownership.
* We want the service to be ignorant of vnodes so we rediscover the vnode.
* @param {string} key The key you have finished work on.
* @param {function} done Optional callback if you want to listen to completion
*/
Sevnup.prototype.workCompleteOnKey = function workCompleteOnKey(key, done) {
var vnode = this.getVNodeForKey(key);
this.workCompleteOnKeyInVNode(key, vnode, done);
};
Sevnup.prototype.workCompleteOnKeyInVNode = function workCompleteOnKeyInVNode(key, vnode, done) {
this.store.removeKey(vnode, key, done);
};
Sevnup.prototype.getOwnedKeys = function getOwnedKeys(done) {
var self = this;
async.waterfall([
async.mapLimit.bind(async, self._getOwnedVNodes(), self.maxConcurrencyLevel, self.store.loadKeys.bind(self.store)),
function(keys, next) {
next(null, _.flatten(keys));
}
], done);
};
Sevnup.prototype._attachToRing = function _attachToRing(addOnLookup) {
var self = this;
if (!this.watchMode) {
this.hashRing.lookup = function sevnupLookup(key) {
var vnode = self.getVNodeForKey(key);
var node = self.hashRingLookup(vnode);
if (addOnLookup) {
self.addKey(key);
}
return node;
};
}
if (this.hashRing.isReady) {
onReady();
} else {
this.hashRing.on('ready', onReady);
}
function onReady() {
self.hashRing.on('ringChanged', self.eventHandler);
self._onRingStateChange();
}
};
/**
* Returns true if this node currently owns vnode.
* @param {string} vnodeName The name of the vnode to check.
*/
Sevnup.prototype._iOwnVNode = function _iOwnVNode(vnodeName) {
// Use the non-patched lookup
var node = this.hashRingLookup(vnodeName);
return this.hashRing.whoami() === node;
};
Sevnup.prototype._getOwnedVNodes = function _getOwnedVNodes() {
var results = [];
for (var i = 0; i < this.totalVNodes; ++i) {
if (this._iOwnVNode(i)) {
results.push(i);
}
}
return results;
};
Sevnup.prototype._onRingStateChange = function _onRingStateChange() {
var self = this;
// This is ignored due to natural complexity of testing
// this
/* istanbul ignore next */
if (!this.running) {
// Shutdown
return;
}
if (this.stateChangeQueue.length() > 0) {
// Ring change already queued
return;
}
if (this.calmTimeout) {
clearTimeout(this.calmTimeout);
}
this.calmTimeout = setTimeout(execute, this.calmThreshold);
function execute() {
self.stateChangeQueue.push(true);
}
};
Sevnup.prototype._retryInQueue = function _retryInQueue(task, done) {
this._withRetry(task.retryName, task.fn, function _retryDone() {
task.cb.apply(task.cb, arguments);
// NOTE: This is a callback passed by `async.queue` it COULD NOT be
// called synchronously
setImmediate(done);
});
};
Sevnup.prototype._handleRingStateChange = function _handleRingStateChange(arg, done) {
var self = this;
var oldOwnedVNodes = self.ownedVNodes;
var newOwnedVNodes = self.ownedVNodes = self._getOwnedVNodes();
var nodesToRelease = _.difference(oldOwnedVNodes, newOwnedVNodes);
var nodesToRecover = _.difference(newOwnedVNodes, oldOwnedVNodes);
self.logger.info('Sevnup._onRingStateChange', {
releasing: nodesToRelease,
recovering: nodesToRecover
});
async.parallel([
self._forEachKeyInVNodesWithRetry.bind(self, self.retryRecoverOnFailure, nodesToRelease, self._releaseKey.bind(self)),
self._forEachKeyInVNodesWithRetry.bind(self, self.retryRecoverOnFailure, nodesToRecover, self._recoverKey.bind(self))
], function() {
nodesToRelease.forEach(self.store.releaseFromCache.bind(self.store));
done();
});
};
Sevnup.prototype._forEachKeyInVNodesWithRetry = function _forEachKeyInVNodesWithRetry(retryErrors, vnodes, onKey, done) {
var self = this;
async.eachLimit(vnodes, self.maxConcurrencyLevel, onVNode, done);
function onVNode(vnode, next) {
async.waterfall([
function _loadWithRetries(wNext) {
_tryWithRetry("loadkeys", self.loadKeyRetryQueue,
self.store.loadKeys.bind(self.store, vnode), wNext);
},
onKeys.bind(null, vnode),
], next);
}
function onKeys(vnode, keys, next) {
async.eachLimit(keys, self.maxConcurrencyLevel, function _each(key, eachNext) {
_tryWithRetry("onkey", self.keyRetryQueue, function _each(cb) {
// Instead of calling the key-handler directly, effectively chaining them
// we're enqueuing them instead
setImmediate(onKey, vnode, key, cb);
}, eachNext);
}, next);
}
function _tryWithRetry(retryName, queue, fn, cb) {
if (retryErrors) {
self._doThenQueue(queue, {
retryName: retryName,
fn: fn,
cb: cb
});
} else {
fn(function _noRetry() {
// Ignore errors
cb.apply(cb, [null].concat(Array.prototype.slice.call(arguments, 1)));
});
}
}
};
Sevnup.prototype._forEachKeyInVNodes = function _forEachKeyInVNodes(vnodes, onKey, done) {
this._forEachKeyInVNodesWithRetry(false, vnodes, onKey, done);
};
Sevnup.prototype._doThenQueue = function _doThenQueue(queue, task) {
task.fn(function _checkFailure(err) {
if (err) {
// Queue
queue.push(task);
return;
}
task.cb.apply(task.cb, arguments);
});
};
Sevnup.prototype._withRetry = function _withRetry(retryName, fn, done) {
var self = this;
fn(function _checkError(err) {
if (err) {
self.maybeIncrementStat('sevnup.retrying', {
retrytype: retryName
});
setTimeout(
self._withRetry.bind(self, retryName, fn, done),
self.retryIntervalMs
).unref();
return;
}
done.apply(done, arguments);
});
};
Sevnup.prototype._recoverKey = function _recoverKey(vnode, key, done) {
var self = this;
async.waterfall([
this.recoverKeyCallback.bind(this, key),
function(handled, next) {
if (handled) {
self.store.removeKey(vnode, key, function(err) {
if (err) {
self.logger.error("Sevnup._recoverKey failed to remove key from vnode", {
vnode: vnode,
key: key,
error: err
});
}
// Swallow
next();
});
} else {
next();
}
}
], function(err) {
if (err) {
self.logger.error("Sevnup._recoverKey encountered an error", {
vnode: vnode,
key: key,
error: err
});
}
// We should propogate errors so we can properly retry if we have that setup
done(err);
});
};
Sevnup.prototype._releaseKey = function _releaseKey(vnode, key, done) {
var self = this;
this.releaseKeyCallback(key, function(err) {
if (err) {
self.logger.error("Sevnup._releaseKey encountered an error", {
vnode: vnode,
key: key,
error: err
});
}
// We should propogate errors so we can properly retry if we have that setup
done(err);
});
};
/**
* Given a key, get the vnode it belongs to. It can then be routed to the
* correct node, via looking up by vnode name.
* @param {string} key The key to match to a vnode.
*/
Sevnup.prototype.getVNodeForKey = function getVNodeForKey(key) {
return farmhash.hash32v1(key) % this.totalVNodes;
};
Sevnup.prototype.destroy = function destroy() {
clearTimeout(this.calmTimeout);
};
Sevnup.prototype.shutdownAndRelease = function shutdownAndRelease(done) {
var self = this;
this.destroy();
this.running = false;
this.hashRing.removeListener('ringChanged', this.eventHandler);
if (this.stateChangeQueue.idle()) {
releaseAll();
} else {
this.stateChangeQueue.drain = releaseAll;
}
function releaseAll() {
self._forEachKeyInVNodes(self.ownedVNodes, self._releaseKey.bind(self), done);
}
};
Sevnup.prototype.isPotentiallyOwnedKey = function isPotentiallyOwnedKey(key) {
var vnode = this.getVNodeForKey(key);
var node = this.hashRingLookup(vnode);
return this.hashRing.whoami() === node;
};
Sevnup.prototype.maybeIncrementStat = function maybeIncrementStat(statName, tags) {
if (this.statsd) {
this.statsd.increment(statName, 1, {
tags: tags
});
}
};