in thing/index.js [373:527]
this._thingOperation = function(thingName, operation, stateObject) {
var rc = null;
if (thingShadows.hasOwnProperty(thingName)) {
//
// Don't allow a new operation if an existing one is still in process.
//
if (thingShadows[thingName].pending === false) {
//
// Starting a new operation
//
thingShadows[thingName].pending = true;
//
// If not provided, construct a clientToken from the clientId and a rolling
// operation count. The clientToken is transmitted in any published stateObject
// and is returned to the caller for each operation. Applications can use
// clientToken values to correlate received responses or timeouts with
// the original operations.
//
var clientToken;
if (isUndefined(stateObject.clientToken)) {
//
// AWS IoT restricts client tokens to 64 bytes, so use only the last 48
// characters of the client ID when constructing a client token.
//
var clientIdLength = deviceOptions.clientId.length;
if (clientIdLength > 48) {
clientToken = deviceOptions.clientId.substr(clientIdLength - 48) + '-' + operationCount++;
} else {
clientToken = deviceOptions.clientId + '-' + operationCount++;
}
} else {
clientToken = stateObject.clientToken;
}
//
// Remember the client token for this operation; it will be
// deleted when the operation completes or times out.
//
thingShadows[thingName].clientToken = clientToken;
var publishTopic = buildThingShadowTopic(thingName,
operation);
//
// Subscribe to the 'accepted' and 'rejected' sub-topics for this get
// operation and set a timeout beyond which they will be unsubscribed if
// no messages have been received for either of them.
//
thingShadows[thingName].timeout = setTimeout(
function(thingName, clientToken) {
//
// Timed-out. Unsubscribe from the 'accepted' and 'rejected' sub-topics unless
// we are persistently subscribing to this thing shadow.
//
if (thingShadows[thingName].persistentSubscribe === false) {
that._handleSubscriptions(thingName, [{
operations: [operation],
statii: ['accepted', 'rejected']
}], 'unsubscribe');
}
//
// Mark this operation as complete.
//
thingShadows[thingName].pending = false;
//
// Delete the timeout handle and client token for this thingName.
//
delete thingShadows[thingName].timeout;
delete thingShadows[thingName].clientToken;
//
// Emit an event for the timeout; the clientToken is included as an argument
// so that the application can correlate timeout events to the operations
// they are associated with.
//
that.emit('timeout', thingName, clientToken);
}, operationTimeout,
thingName, clientToken);
//
// Subscribe to the 'accepted' and 'rejected' sub-topics unless we are
// persistently subscribing, in which case we can publish to the topic immediately
// since we are already subscribed to all applicable sub-topics.
//
if (thingShadows[thingName].persistentSubscribe === false) {
this._handleSubscriptions(thingName, [{
operations: [operation],
statii: ['accepted', 'rejected'],
}], 'subscribe',
function(err, failedTopics) {
if (!isUndefined(err) || !isUndefined(failedTopics)) {
console.warn('failed subscription to accepted/rejected topics');
return;
}
//
// If 'stateObject' is defined, publish it to the publish topic for this
// thingName+operation.
//
if (!isUndefined(stateObject)) {
//
// Add the version # (if known and versioning is enabled) and
// 'clientToken' properties to the stateObject.
//
if (!isUndefined(thingShadows[thingName].version) &&
thingShadows[thingName].enableVersioning) {
stateObject.version = thingShadows[thingName].version;
}
stateObject.clientToken = clientToken;
device.publish(publishTopic,
JSON.stringify(stateObject), {
qos: thingShadows[thingName].qos
});
if (!(isUndefined(thingShadows[thingName])) &&
thingShadows[thingName].debug === true) {
console.log('publishing \'' + JSON.stringify(stateObject) +
' on \'' + publishTopic + '\'');
}
}
});
} else {
//
// Add the version # (if known and versioning is enabled) and
// 'clientToken' properties to the stateObject.
//
if (!isUndefined(thingShadows[thingName].version) &&
thingShadows[thingName].enableVersioning) {
stateObject.version = thingShadows[thingName].version;
}
stateObject.clientToken = clientToken;
device.publish(publishTopic,
JSON.stringify(stateObject), {
qos: thingShadows[thingName].qos
});
if (thingShadows[thingName].debug === true) {
console.log('publishing \'' + JSON.stringify(stateObject) +
' on \'' + publishTopic + '\'');
}
}
rc = clientToken; // return the clientToken to the caller
} else {
if (deviceOptions.debug === true) {
console.error(operation + ' still in progress on thing: ', thingName);
}
}
} else {
if (deviceOptions.debug === true) {
console.error('attempting to ' + operation + ' unknown thing: ', thingName);
}
}
return rc;
};