in device/index.js [688:750]
function _drainOperationQueue() {
//
// Handle our active subscriptions first, using a cloned
// copy of the array. We shift them out one-by-one until
// all have been processed, leaving the official record
// of active subscriptions untouched.
//
var subscription = clonedSubscriptions.shift();
if (!isUndefined(subscription)) {
//
// If the 3rd argument (namely callback) is not present, we will
// use two-argument form to call mqtt.Client#subscribe(), which
// supports both subscribe(topics, options) and subscribe(topics, callback).
//
if (!isUndefined(subscription.callback)) {
device.subscribe(subscription.topic, subscription.options, subscription.callback);
} else {
device.subscribe(subscription.topic, subscription.options);
}
} else {
//
// If no remaining active subscriptions to process,
// then handle subscription requests queued while offline.
//
var req = offlineSubscriptionQueue.shift();
if (!isUndefined(req)) {
_updateSubscriptionCache(req.type, req.topics, req.options);
if (req.type === 'subscribe') {
if (!isUndefined(req.callback)) {
device.subscribe(req.topics, req.options, req.callback);
} else {
device.subscribe(req.topics, req.options);
}
} else if (req.type === 'unsubscribe') {
device.unsubscribe(req.topics, req.callback);
}
} else {
//
// If no active or queued subscriptions remaining to process,
// then handle queued publish operations.
//
var offlinePublishMessage = offlinePublishQueue.shift();
if (!isUndefined(offlinePublishMessage)) {
device.publish(offlinePublishMessage.topic,
offlinePublishMessage.message,
offlinePublishMessage.options,
offlinePublishMessage.callback);
}
if (offlinePublishQueue.length === 0) {
//
// The subscription and offlinePublishQueue queues are fully drained,
// cancel the draining timer.
//
clearInterval(drainingTimer);
drainingTimer = null;
}
}
}
}