in action/kafkaFeedWeb.js [33:169]
function main(params) {
var promise = new Promise((resolve, reject) => {
// hold off initializing this until definitely needed
var db;
if (params.__ow_method === "post") {
var validatedParams;
return validateParameters(params)
.then(cleanParams => {
validatedParams = cleanParams;
console.log(`VALIDATED: ${JSON.stringify(validatedParams, null, 2)}`);
db = new Database(params.DB_URL, params.DB_NAME);
// do these in parallel!
return Promise.all([
db.ensureTriggerIsUnique(validatedParams.triggerName),
verifyTriggerAuth(validatedParams.triggerURL, params.authKey, true)
]);
})
.then(() => {
var workers = (params.workers || []);
return db.getTriggerAssignment(workers)
})
.then((worker) => {
validatedParams['worker'] = worker;
return db.recordTrigger(validatedParams);
})
.then(() => {
console.log('successfully wrote the trigger');
resolve(common.webResponse(200, validatedParams.uuid));
})
.catch(error => {
console.log(`Failed to write the trigger ${error}`);
// defaults to potentially be overridden
var statusCode = 500;
var body = error.toString();
if(error.validationError) {
statusCode = 400;
body = error.validationError;
} else if(error.authError) {
statusCode = 401;
body = error.authError;
}
resolve(common.webResponse(statusCode, body));
});
} else if (params.__ow_method === "get") {
const triggerURL = common.getTriggerURL(params.endpoint, params.triggerName);
return verifyTriggerAuth(triggerURL, params.authKey, true)
.then(() => {
db = new Database(params.DB_URL, params.DB_NAME);
return db.getTrigger(params.triggerName);
})
.then((triggerDoc) => {
var body = {
config: {
triggerName: triggerDoc.triggerName,
topic: triggerDoc.topic,
isJSONData: triggerDoc.isJSONData,
isBinaryValue: triggerDoc.isBinaryValue,
isBinaryKey: triggerDoc.isBinaryKey,
brokers: triggerDoc.brokers
},
status: {
active: triggerDoc.status.active,
dateChanged: moment(triggerDoc.status.dateChanged).utc().valueOf(),
dateChangedISO: moment(triggerDoc.status.dateChanged).utc().format(),
reason: triggerDoc.status.reason
}
}
resolve(common.webResponse(200, body, 'application/json'));
})
.catch(error => {
resolve(common.webResponse(500, error.toString()));
});
} else if (params.__ow_method === "put") {
const triggerURL = common.getTriggerURL(params.endpoint, params.triggerName);
return verifyTriggerAuth(triggerURL, params.authKey, true)
.then(() => {
db = new Database(params.DB_URL, params.DB_NAME);
return db.getTrigger(params.triggerName);
})
.then(triggerDoc => {
if (!triggerDoc.status.active) {
return resolve(common.webResponse(400, `${params.triggerName} cannot be updated because it is disabled`));
}
return common.performUpdateParameterValidation(params, triggerDoc)
.then(updatedParams => {
return db.disableTrigger(triggerDoc)
.then(() => db.getTrigger(params.triggerName))
.then(doc => db.updateTrigger(doc, updatedParams));
});
})
.then(() => {
console.log('successfully updated the trigger');
resolve(common.webResponse(200, 'updated trigger'));
})
.catch(error => {
console.log(`Failed to update trigger ${error}`);
var statusCode = 500;
var body = error.toString();
if (error.validationError) {
statusCode = 400;
body = error.validationError;
}
resolve(common.webResponse(statusCode, body));
});
} else if (params.__ow_method === "delete") {
const triggerURL = common.getTriggerURL(params.endpoint, params.triggerName);
return verifyTriggerAuth(triggerURL, params.authKey, false)
.then(() => {
db = new Database(params.DB_URL, params.DB_NAME);
return db.deleteTrigger(params.triggerName);
})
.then(() => {
console.log('successfully deleted the trigger');
resolve(common.webResponse(200, 'deleted trigger'));
})
.catch(error => {
console.log(`Failed to remove trigger ${error}`);
resolve(common.webResponse(500, error.toString()));
});
} else {
resolve(common.webResponse(400, 'unsupported lifecycleEvent'));
}
});
return promise;
}