this._thingOperation = function()

in thing/index.js [373:527]


   this._thingOperation = function(thingName, operation, stateObject) {
      var rc = null;

      if (thingShadows.hasOwnProperty(thingName)) {
         //
         // Don't allow a new operation if an existing one is still in process.
         //
         if (thingShadows[thingName].pending === false) {
            //
            // Starting a new operation
            //
            thingShadows[thingName].pending = true;
            //
            // If not provided, construct a clientToken from the clientId and a rolling 
            // operation count.  The clientToken is transmitted in any published stateObject 
            // and is returned to the caller for each operation.  Applications can use
            // clientToken values to correlate received responses or timeouts with
            // the original operations.
            //
            var clientToken;

            if (isUndefined(stateObject.clientToken)) {
               //
               // AWS IoT restricts client tokens to 64 bytes, so use only the last 48
               // characters of the client ID when constructing a client token.
               //
               var clientIdLength = deviceOptions.clientId.length;

               if (clientIdLength > 48) {
                  clientToken = deviceOptions.clientId.substr(clientIdLength - 48) + '-' + operationCount++;
               } else {
                  clientToken = deviceOptions.clientId + '-' + operationCount++;
               }
            } else {
               clientToken = stateObject.clientToken;
            }
            //
            // Remember the client token for this operation; it will be
            // deleted when the operation completes or times out.
            //
            thingShadows[thingName].clientToken = clientToken;

            var publishTopic = buildThingShadowTopic(thingName,
               operation);
            //
            // Subscribe to the 'accepted' and 'rejected' sub-topics for this get
            // operation and set a timeout beyond which they will be unsubscribed if 
            // no messages have been received for either of them.
            //
            thingShadows[thingName].timeout = setTimeout(
               function(thingName, clientToken) {
                  //
                  // Timed-out.  Unsubscribe from the 'accepted' and 'rejected' sub-topics unless
                  // we are persistently subscribing to this thing shadow.
                  //
                  if (thingShadows[thingName].persistentSubscribe === false) {
                     that._handleSubscriptions(thingName, [{
                        operations: [operation],
                        statii: ['accepted', 'rejected']
                     }], 'unsubscribe');
                  }
                  //
                  // Mark this operation as complete.
                  //
                  thingShadows[thingName].pending = false;

                  //
                  // Delete the timeout handle and client token for this thingName.
                  //
                  delete thingShadows[thingName].timeout;
                  delete thingShadows[thingName].clientToken;

                  //
                  // Emit an event for the timeout; the clientToken is included as an argument
                  // so that the application can correlate timeout events to the operations
                  // they are associated with.
                  //
                  that.emit('timeout', thingName, clientToken);
               }, operationTimeout,
               thingName, clientToken);
            //
            // Subscribe to the 'accepted' and 'rejected' sub-topics unless we are
            // persistently subscribing, in which case we can publish to the topic immediately
            // since we are already subscribed to all applicable sub-topics.
            //
            if (thingShadows[thingName].persistentSubscribe === false) {
               this._handleSubscriptions(thingName, [{
                     operations: [operation],
                     statii: ['accepted', 'rejected'],
                  }], 'subscribe',
                  function(err, failedTopics) {
                     if (!isUndefined(err) || !isUndefined(failedTopics)) {
                        console.warn('failed subscription to accepted/rejected topics');
                        return;
                     }

                     //
                     // If 'stateObject' is defined, publish it to the publish topic for this
                     // thingName+operation.
                     //
                     if (!isUndefined(stateObject)) {
                        //
                        // Add the version # (if known and versioning is enabled) and 
                        // 'clientToken' properties to the stateObject.
                        //
                        if (!isUndefined(thingShadows[thingName].version) &&
                           thingShadows[thingName].enableVersioning) {
                           stateObject.version = thingShadows[thingName].version;
                        }
                        stateObject.clientToken = clientToken;

                        device.publish(publishTopic,
                           JSON.stringify(stateObject), {
                              qos: thingShadows[thingName].qos
                           });
                        if (!(isUndefined(thingShadows[thingName])) &&
                           thingShadows[thingName].debug === true) {
                           console.log('publishing \'' + JSON.stringify(stateObject) +
                              ' on \'' + publishTopic + '\'');
                        }
                     }
                  });
            } else {
               //
               // Add the version # (if known and versioning is enabled) and 
               // 'clientToken' properties to the stateObject.
               //
               if (!isUndefined(thingShadows[thingName].version) &&
                  thingShadows[thingName].enableVersioning) {
                  stateObject.version = thingShadows[thingName].version;
               }
               stateObject.clientToken = clientToken;

               device.publish(publishTopic,
                  JSON.stringify(stateObject), {
                     qos: thingShadows[thingName].qos
                  });
               if (thingShadows[thingName].debug === true) {
                  console.log('publishing \'' + JSON.stringify(stateObject) +
                     ' on \'' + publishTopic + '\'');
               }
            }
            rc = clientToken; // return the clientToken to the caller
         } else {
            if (deviceOptions.debug === true) {
               console.error(operation + ' still in progress on thing: ', thingName);
            }
         }
      } else {
         if (deviceOptions.debug === true) {
            console.error('attempting to ' + operation + ' unknown thing: ', thingName);
         }
      }
      return rc;
   };