in thing/index.js [197:313]
this._handleMessages = function(thingName, operation, operationStatus, payload) {
var stateObject = {};
try {
stateObject = JSON.parse(payload.toString());
} catch (err) {
if (deviceOptions.debug === true) {
console.error('failed parsing JSON \'' + payload.toString() + '\', ' + err);
}
return;
}
var clientToken = stateObject.clientToken;
var version = stateObject.version;
//
// Remove the properties 'clientToken' and 'version' from the stateObject;
// these properties are internal to this class.
//
delete stateObject.clientToken;
//Expose shadow version from raw object
//delete stateObject.version;
//
// Update the thing version on every accepted or delta message which
// contains it.
//
if ((!isUndefined(version)) && (operationStatus !== 'rejected')) {
//
// The thing shadow version is incremented by AWS IoT and should always
// increase. Do not update our local version if the received version is
// less than our version.
//
if ((isUndefined(thingShadows[thingName].version)) ||
(version >= thingShadows[thingName].version)) {
thingShadows[thingName].version = version;
} else {
//
// We've received a message from AWS IoT with a version number lower than
// we would expect. There are two things that can cause this:
//
// 1) The shadow has been deleted (version # reverts to 1 in this case.)
// 2) The message has arrived out-of-order.
//
// For case 1) we can look at the operation to determine that this
// is the case and notify the client if appropriate. For case 2,
// we will not process it unless the client has specifically expressed
// an interested in these messages by setting 'discardStale' to false.
//
if (operation !== 'delete' && thingShadows[thingName].discardStale === true) {
if (deviceOptions.debug === true) {
console.warn('out-of-date version \'' + version + '\' on \'' +
thingName + '\' (local version \'' +
thingShadows[thingName].version + '\')');
}
return;
}
}
}
//
// If this is a 'delta' message, emit an event for it and return.
//
if (operationStatus === 'delta') {
this.emit('delta', thingName, stateObject);
return;
}
//
// only accepted/rejected messages past this point
// ===============================================
// If this is an unkown clientToken (e.g., it doesn't have a corresponding
// client token property, the shadow has been modified by another client.
// If it's an update/accepted or delete/accepted, update the shadow and
// notify the client.
//
if (isUndefined(thingShadows[thingName].clientToken) ||
thingShadows[thingName].clientToken !== clientToken) {
if ((operationStatus === 'accepted') && (operation !== 'get')) {
//
// This is a foreign update or delete accepted, update our
// shadow with the latest state and send a notification.
//
this.emit('foreignStateChange', thingName, operation, stateObject);
}
return;
}
//
// A response has been received, so cancel any outstanding timeout on this
// thingName/clientToken, delete the timeout handle, and unsubscribe from
// all sub-topics.
//
clearTimeout(
thingShadows[thingName].timeout);
delete thingShadows[thingName].timeout;
//
// Delete the operation's client token.
//
delete thingShadows[thingName].clientToken;
//
// Mark this operation as complete.
//
thingShadows[thingName].pending = false;
//
// Unsubscribe from the 'accepted' and 'rejected' sub-topics unless we are
// persistently subscribed to this thing shadow.
//
if (thingShadows[thingName].persistentSubscribe === false) {
this._handleSubscriptions(thingName, [{
operations: [operation],
statii: ['accepted', 'rejected']
}], 'unsubscribe');
}
//
// Emit an event detailing the operation status; the clientToken is included
// as an argument so that the application can correlate status events to
// the operations they are associated with.
//
this.emit('status', thingName, operationStatus, clientToken, stateObject);
};