in jobs/index.js [57:327]
function jobsClient(options) {
//
// Force instantiation using the 'new' operator; this will cause inherited
// constructors (e.g. the 'events' class) to be called.
//
if (!(this instanceof jobsClient)) {
return new jobsClient(options);
}
//
// A copy of 'this' for use inside of closures
//
var that = this;
//
// Track job subscriptions
//
// [
// {
// "thingName": "string",
// "operations": [
// {
// "operationName": "string", // if null then treat as default
// "currentJob": job,
// "callback": callback
// }
// ]
// }
// ]
//
var jobSubscriptions = [];
//
// Instantiate the device
//
var device = deviceModule.DeviceClient(options);
//
// Private function to update job execution status for given thing
//
this._updateJobStatus = function(thingName, job, status, statusDetails, callback) {
// Check for omitted statusDetails and update parameters
if (typeof statusDetails === "function") {
callback = statusDetails;
statusDetails = undefined;
}
if ((!isUndefined(options)) && (options.debug === true)) {
console.log('updateJobStatus:', { thingName: thingName, jobId: job.id, status: status, statusDetails: statusDetails });
}
device.publish(buildJobTopic(thingName, job.id, 'update'), JSON.stringify({ status: status, statusDetails: statusDetails}), null, function(err){
if (isUndefined(err)) {
job.status = { status: status, statusDetails: statusDetails };
}
if (!isUndefined(callback)) {
callback(err);
}
});
}
//
// Private function to build job object for passing to callback supplied in subscribeToJobs
//
this._buildJobObject = function(thingName, jobExecution) {
if (isUndefined(jobExecution) || isUndefined(jobExecution.jobId)) {
return null;
}
var job = {};
job.id = jobExecution.jobId;
job.document = jobExecution.jobDocument;
job.operation = job.document.operation;
job.status = { status: jobExecution.status, statusDetails: jobExecution.statusDetails };
job.inProgress = function(statusDetails, callback) {
that._updateJobStatus(thingName, job, 'IN_PROGRESS', statusDetails, callback);
}
job.failed = function(statusDetails, callback) {
that._updateJobStatus(thingName, job, 'FAILED', statusDetails, callback);
}
job.succeeded = function(statusDetails, callback) {
that._updateJobStatus(thingName, job, 'SUCCEEDED', statusDetails, callback);
}
return job;
}
//
// Private function to handle job messages and process them accordingly
//
this._handleMessages = function(topic, payload) {
var topicTokens = topic.split('/');
// If not a job topic emit to application and return
if (!isJobTopic(topicTokens)) {
that.emit('message', topic, payload);
return;
}
var thingName = topicTokens[2];
var thing = jobSubscriptions.find(function(elem) {
return elem.thingName === thingName;
});
// Do nothing if thing not found in job subscriptions
if (isUndefined(thing)) {
return;
}
var jobExecutionData = {};
try {
jobExecutionData = JSON.parse(payload.toString());
} catch (err) {
if (options.debug === true) {
console.error('failed parsing JSON \'' + payload.toString() + '\', ' + err);
}
return;
}
if (isUndefined(jobExecutionData.execution) ||
isUndefined(jobExecutionData.execution.jobId) ||
isUndefined(jobExecutionData.execution.jobDocument)) {
return;
}
var operationName = jobExecutionData.execution.jobDocument.operation;
var operation = thing.operations.find(function(elem) {
return (isUndefined(operationName) ? isUndefined(elem.operationName) : operationName === elem.operationName);
});
// If operation subscription not found by operation name then look for default operation subscription
if (isUndefined(operation)) {
operation = thing.operations.find(function(elem) { return (isUndefined(elem.operationName)); });
if (isUndefined(operation)) {
return;
}
}
operation.callback(null, that._buildJobObject(thingName, jobExecutionData.execution));
}
this.subscribeToJobs = function(thingName, operationName, callback) {
// Check for omitted optional operationName and fixup parameters
if (isUndefined(callback)) {
callback = operationName;
operationName = null;
}
if ((!isUndefined(options)) && (options.debug === true)) {
console.log('subscribeToJobs:', { thingName: thingName, operationName: operationName });
}
var thing = jobSubscriptions.find(function(elem) {
return elem.thingName === thingName;
});
// Check for previously unseen thing and add to job subscriptions
if (isUndefined(thing)) {
thing = { thingName: thingName, operations: [] };
jobSubscriptions.push(thing);
device.subscribe([ buildJobTopic(thingName, '$next/get/accepted'), buildJobTopic(thingName, 'notify-next') ], function(err, granted) {
if (!isUndefined(err)) {
callback(err);
}
});
}
// Find existing subscription for the given operationName
var operation = thing.operations.find(function(elem) {
return (isUndefined(operationName) ? isUndefined(elem.operationName) : operationName === elem.operationName);
});
// If existing subscription found then update callback, otherwise create new entry in the thing's operations array
if (!isUndefined(operation)) {
operation.callback = callback;
} else {
operation = { operationName: operationName, callback: callback };
thing.operations.push(operation);
}
}
this.unsubscribeFromJobs = function(thingName, operationName, callback) {
// Check for omitted optional operationName and fixup parameters
if (isUndefined(callback)) {
callback = operationName;
operationName = null;
}
if ((!isUndefined(options)) && (options.debug === true)) {
console.log('unsubscribeFromJobs:', { thingName: thingName, operationName: operationName });
}
var iThing = jobSubscriptions.findIndex(function(elem) {
return elem.thingName === thingName;
});
var notFoundError = new Error('subscription not found for given thing');
// Check for previously unseen thing and add to job subscriptions and publish to get to retrieve first job to be executed
if (iThing < 0) {
callback(notFoundError);
return;
}
var iOperation = jobSubscriptions[iThing].operations.findIndex(function (elem) {
return (isUndefined(operationName) ? isUndefined(elem.operationName) : operationName === elem.operationName);
});
if (iOperation < 0) {
callback(notFoundError);
return;
}
jobSubscriptions[iThing].operations.splice(iOperation, 1);
if (jobSubscriptions[iThing].operations.length === 0) {
jobSubscriptions.splice(iThing, 1);
device.unsubscribe([ buildJobTopic(thingName, '$next/get/accepted'), buildJobTopic(thingName, 'notify-next') ], callback);
return;
}
callback();
}
this.startJobNotifications = function(thingName, callback) {
if ((!isUndefined(options)) && (options.debug === true)) {
console.log('startJobNotifications:', { thingName: thingName });
}
device.publish(buildJobTopic(thingName, '$next', 'get'), '{}', callback);
}
device.on('connect', function() {
that.emit('connect');
});
device.on('close', function() {
that.emit('close');
});
device.on('reconnect', function() {
that.emit('reconnect');
});
device.on('offline', function() {
that.emit('offline');
});
device.on('error', function(error) {
that.emit('error', error);
});
device.on('message', that._handleMessages);
this.publish = device.publish;
this.subscribe = device.subscribe;
this.unsubscribe = device.unsubscribe;
this.end = device.end;
this.handleMessage = device.handleMessage;
this.updateWebSocketCredentials = device.updateWebSocketCredentials;
//
// Used for integration testing only
//
this.simulateNetworkFailure = device.simulateNetworkFailure;
}