in components/base-workflow/packages/base-workflow-core/lib/runner/handler.js [79:329]
async function handler({ input = {}, meta = {}, loop = {} } = {}, _context, registerServices) {
// eslint-disable-line no-unused-vars
const { memento = {}, wp = {}, ssp = {}, slp = {}, sid: sidFromLoop } = loop;
// Register services
const container = new ServicesContainer(['settings', 'log']);
await registerServices(container);
await container.initServices();
// Check circular dependencies
container.validate();
const log = await container.find('log');
const settings = await container.find('settings');
const workflowsEnabled = settings.optionalBoolean(settingKeys.workflowsEnabled, true);
if (!workflowsEnabled) {
log.info('Skipping the processing of the workflows because the setting "workflowsEnabled" is false');
return { shouldEnd: 1 };
}
const { wid, wrv, sid: sidFromMeta } = meta;
let wrvParsed;
let sid = sidFromMeta || sidFromLoop;
const workflowInstanceService = await container.find('workflowInstanceService');
let instance;
// Wrap the raw input with WorkflowInput
const wfInput = new WorkflowInput({ input });
const requestContext = getSystemRequestContext();
if (!sid) {
wrvParsed = toNumber(wrv);
if (_.isEmpty(wid) || _.isUndefined(wrvParsed)) {
throw new Error('The "meta" part of the input must contain "wid" and "wrv"');
}
instance = await workflowInstanceService.createInstance(
requestContext,
{
workflowId: wid,
workflowVer: wrvParsed,
status: 'in_progress',
},
wfInput,
);
sid = instance.id;
} else {
instance = await workflowInstanceService.mustFindInstance(requestContext, { id: sid });
}
// TODO: Check if runSpec target is supported
const workflowInstance = new WorkflowInstance({ workflowInstance: instance });
// A convenient function to allow us to wrap a fn with catchIfErrorAsync
const safeCall = fn => async (...params) => catchIfErrorAsync(async () => fn(...params));
// Get the steps registry and register the steps and construct the classResolver
const stepRegistry = await container.find('stepRegistryService');
const classResolver = async ({ stepTemplateId, stepTemplateVer }) => {
const entry = await stepRegistry.findStep({
id: stepTemplateId,
v: stepTemplateVer,
});
return entry ? entry.implClass : undefined;
};
// ----
// Create and restore all the workflowLoop dependencies/helpers
// ----
// Create stepStateProvider
const stepStateProvider = new StepStateProvider();
stepStateProvider.setMemento(ssp);
// Create workflowReporter
const workflowReporter = new WorkflowReporter({
workflowInstance,
log,
workflowInstanceService,
});
// Create and restore workflowPayload
const workflowPayload = new WorkflowPayload({
workflowInstance,
meta,
input: wfInput,
});
wp.m = !wp.m || _.isEmpty(wp.m) ? workflowPayload.meta : wp.m;
workflowPayload.setMemento(wp);
// Create stepClassProvider
const stepClassProvider = {
getClass: async ({ step, workflowStatus }) => {
const Class = await classResolver(step);
if (_.isNil(Class)) return undefined;
const stepReporter = workflowReporter.getStepReporter({ step });
const stepState = await stepStateProvider.getStepState({ step });
const impl = new Class({
input: wfInput,
workflowInstance,
container,
workflowPayload,
step,
stepReporter,
stepState,
workflowStatus,
});
return impl;
},
};
// Create and restore stepLoopProvider
const stepLoopProvider = new StepLoopProvider({
workflowInstance,
stepClassProvider,
});
stepLoopProvider.setMemento(slp);
// Register with the step loop provider event and the step loop events
stepLoopProvider.on(
'stepLoopCreated',
safeCall(async stepLoop => {
const step = stepLoop.step;
const reporter = workflowReporter.getStepReporter({ step });
stepLoop
.on(
'stepLoopSkipped',
safeCall(async () => reporter.stepSkipped()),
)
.on(
'stepLoopStarted',
safeCall(async () => reporter.stepStarted()),
)
.on(
'stepLoopMethodCall',
safeCall(async name => reporter.print(`StepLoop - calling ${name}()`)),
)
.on(
'stepLoopQueueAdd',
safeCall(async msg => reporter.print(msg)),
)
.on(
'stepLoopStepPausing',
safeCall(async reasonForPause => reporter.stepPaused(reasonForPause)),
)
.on(
'stepLoopStepResuming',
safeCall(async reasonForResume => reporter.stepResumed(reasonForResume)),
)
.on(
'stepLoopStepMaxPauseReached',
safeCall(async () => reporter.stepMaxPauseReached()),
)
.on(
'stepLoopRequestingGoTo',
safeCall(async () => {
// The step requested WF to execute from other step so the currently executing step is treated as passed
// (or "done" - as it has done it's job of requesting to executing from other step)
return reporter.stepPassed();
}),
)
.on(
'stepLoopPassed',
safeCall(async () => reporter.stepPassed()),
)
.on(
'stepLoopFailed',
safeCall(async (...params) => reporter.stepFailed(...params)),
)
.on('beforeStepLoopTick', async () => {
const stepState = await stepStateProvider.getStepState({ step });
await stepState.load();
await workflowPayload.load();
})
.on('afterStepLoopTick', async () => {
const stepState = await stepStateProvider.getStepState({ step });
await stepState.save();
await workflowPayload.save();
});
}),
);
// Create and restore the workflowLoop
const workflowLoop = new WorkflowLoop({ workflowInstance, stepLoopProvider });
workflowLoop.setMemento(memento);
// Register with the workflow loop events
workflowLoop
.on(
'workflowStarted',
safeCall(async () => workflowReporter.workflowStarted()),
)
.on(
'workflowPaused',
safeCall(async () => workflowReporter.workflowPaused()),
)
.on(
'workflowResuming',
safeCall(async () => workflowReporter.workflowResuming()),
)
.on(
'workflowPassed',
safeCall(async () => workflowReporter.workflowPassed()),
)
.on(
'workflowFailed',
safeCall(async (...params) => workflowReporter.workflowFailed(...params)),
);
// Run one iteration of the workflowLoop
const decision = await workflowLoop.tick();
// Deal with the output
if (_.isEmpty(decision)) {
throw new Error("The workflow loop tick() method didn't return a decision object");
}
const output = {
shouldWait: 0,
shouldLoop: 0,
shouldPass: 0,
shouldFail: 0,
memento: workflowLoop.getMemento(),
slp: stepLoopProvider.getMemento(),
wp: workflowPayload.getMemento(),
ssp: stepStateProvider.getMemento(),
};
if (!sidFromMeta) output.sid = sid;
switch (decision.type) {
case 'loop':
output.shouldLoop = 1;
break;
case 'wait':
case 'pause':
output.shouldWait = 1;
output.wait = decision.wait;
break;
case 'pass':
output.shouldPass = 1;
break;
case 'fail':
output.shouldFail = 1;
output.error = _.omit(decision, ['type']);
break;
default:
throw new Error(`The workflow loop tick() method returned unsupported decision type of "${decision.type}"`);
}
return output;
}