in thing/index.js [127:192]
this._handleSubscriptions = function(thingName, topicSpecs, devFunction, callback) {
var topics = [];
//
// Build an array of topic names.
//
for (var i = 0, topicsLen = topicSpecs.length; i < topicsLen; i++) {
for (var j = 0, opsLen = topicSpecs[i].operations.length; j < opsLen; j++) {
for (var k = 0, statLen = topicSpecs[i].statii.length; k < statLen; k++) {
topics.push(buildThingShadowTopic(thingName,
topicSpecs[i].operations[j],
topicSpecs[i].statii[k]));
}
}
}
if (thingShadows[thingName].debug === true) {
console.log(devFunction + ' on ' + topics);
}
//
// Subscribe/unsubscribe from the topics and perform callback when complete.
//
var args = [];
args.push(topics);
if (devFunction === 'subscribe') {
// QoS only applicable for subscribe
args.push({
qos: thingShadows[thingName].qos
});
// add our callback to check the SUBACK response for granted subscriptions
args.push(function(err, granted) {
if (!isUndefined(callback)) {
if (err) {
callback(err);
return;
}
//
// Check to see if we got all topic subscriptions granted.
//
var failedTopics = [];
for (var k = 0, grantedLen = granted.length; k < grantedLen; k++) {
//
// 128 is 0x80 - Failure from the MQTT lib.
//
if (granted[k].qos === 128) {
failedTopics.push(granted[k]);
}
}
if (failedTopics.length > 0) {
callback('Not all subscriptions were granted', failedTopics);
return;
}
// all subscriptions were granted
callback();
}
});
} else {
if (!isUndefined(callback)) {
args.push(callback);
}
}
device[devFunction].apply(device, args);
};