async function handler()

in components/base-workflow/packages/base-workflow-core/lib/runner/handler.js [79:329]


async function handler({ input = {}, meta = {}, loop = {} } = {}, _context, registerServices) {
  // eslint-disable-line no-unused-vars
  const { memento = {}, wp = {}, ssp = {}, slp = {}, sid: sidFromLoop } = loop;

  // Register services
  const container = new ServicesContainer(['settings', 'log']);
  await registerServices(container);
  await container.initServices();

  // Check circular dependencies
  container.validate();

  const log = await container.find('log');
  const settings = await container.find('settings');
  const workflowsEnabled = settings.optionalBoolean(settingKeys.workflowsEnabled, true);

  if (!workflowsEnabled) {
    log.info('Skipping the processing of the workflows because the setting "workflowsEnabled" is false');
    return { shouldEnd: 1 };
  }

  const { wid, wrv, sid: sidFromMeta } = meta;
  let wrvParsed;
  let sid = sidFromMeta || sidFromLoop;
  const workflowInstanceService = await container.find('workflowInstanceService');
  let instance;

  // Wrap the raw input with WorkflowInput
  const wfInput = new WorkflowInput({ input });

  const requestContext = getSystemRequestContext();
  if (!sid) {
    wrvParsed = toNumber(wrv);
    if (_.isEmpty(wid) || _.isUndefined(wrvParsed)) {
      throw new Error('The "meta" part of the input must contain "wid" and "wrv"');
    }
    instance = await workflowInstanceService.createInstance(
      requestContext,
      {
        workflowId: wid,
        workflowVer: wrvParsed,
        status: 'in_progress',
      },
      wfInput,
    );
    sid = instance.id;
  } else {
    instance = await workflowInstanceService.mustFindInstance(requestContext, { id: sid });
  }

  // TODO: Check if runSpec target is supported
  const workflowInstance = new WorkflowInstance({ workflowInstance: instance });

  // A convenient function to allow us to wrap a fn with catchIfErrorAsync
  const safeCall = fn => async (...params) => catchIfErrorAsync(async () => fn(...params));

  // Get the steps registry and register the steps and construct the classResolver
  const stepRegistry = await container.find('stepRegistryService');
  const classResolver = async ({ stepTemplateId, stepTemplateVer }) => {
    const entry = await stepRegistry.findStep({
      id: stepTemplateId,
      v: stepTemplateVer,
    });
    return entry ? entry.implClass : undefined;
  };

  // ----
  // Create and restore all the workflowLoop dependencies/helpers
  // ----

  // Create stepStateProvider
  const stepStateProvider = new StepStateProvider();
  stepStateProvider.setMemento(ssp);

  // Create workflowReporter
  const workflowReporter = new WorkflowReporter({
    workflowInstance,
    log,
    workflowInstanceService,
  });

  // Create and restore workflowPayload
  const workflowPayload = new WorkflowPayload({
    workflowInstance,
    meta,
    input: wfInput,
  });
  wp.m = !wp.m || _.isEmpty(wp.m) ? workflowPayload.meta : wp.m;
  workflowPayload.setMemento(wp);

  // Create stepClassProvider
  const stepClassProvider = {
    getClass: async ({ step, workflowStatus }) => {
      const Class = await classResolver(step);
      if (_.isNil(Class)) return undefined;
      const stepReporter = workflowReporter.getStepReporter({ step });
      const stepState = await stepStateProvider.getStepState({ step });
      const impl = new Class({
        input: wfInput,
        workflowInstance,
        container,
        workflowPayload,
        step,
        stepReporter,
        stepState,
        workflowStatus,
      });
      return impl;
    },
  };

  // Create and restore stepLoopProvider
  const stepLoopProvider = new StepLoopProvider({
    workflowInstance,
    stepClassProvider,
  });
  stepLoopProvider.setMemento(slp);

  // Register with the step loop provider event and the step loop events
  stepLoopProvider.on(
    'stepLoopCreated',
    safeCall(async stepLoop => {
      const step = stepLoop.step;
      const reporter = workflowReporter.getStepReporter({ step });
      stepLoop
        .on(
          'stepLoopSkipped',
          safeCall(async () => reporter.stepSkipped()),
        )
        .on(
          'stepLoopStarted',
          safeCall(async () => reporter.stepStarted()),
        )
        .on(
          'stepLoopMethodCall',
          safeCall(async name => reporter.print(`StepLoop - calling ${name}()`)),
        )
        .on(
          'stepLoopQueueAdd',
          safeCall(async msg => reporter.print(msg)),
        )
        .on(
          'stepLoopStepPausing',
          safeCall(async reasonForPause => reporter.stepPaused(reasonForPause)),
        )
        .on(
          'stepLoopStepResuming',
          safeCall(async reasonForResume => reporter.stepResumed(reasonForResume)),
        )
        .on(
          'stepLoopStepMaxPauseReached',
          safeCall(async () => reporter.stepMaxPauseReached()),
        )
        .on(
          'stepLoopRequestingGoTo',
          safeCall(async () => {
            // The step requested WF to execute from other step so the currently executing step is treated as passed
            // (or "done" - as it has done it's job of requesting to executing from other step)
            return reporter.stepPassed();
          }),
        )
        .on(
          'stepLoopPassed',
          safeCall(async () => reporter.stepPassed()),
        )
        .on(
          'stepLoopFailed',
          safeCall(async (...params) => reporter.stepFailed(...params)),
        )
        .on('beforeStepLoopTick', async () => {
          const stepState = await stepStateProvider.getStepState({ step });
          await stepState.load();
          await workflowPayload.load();
        })
        .on('afterStepLoopTick', async () => {
          const stepState = await stepStateProvider.getStepState({ step });
          await stepState.save();
          await workflowPayload.save();
        });
    }),
  );

  // Create and restore the workflowLoop
  const workflowLoop = new WorkflowLoop({ workflowInstance, stepLoopProvider });
  workflowLoop.setMemento(memento);

  // Register with the workflow loop events
  workflowLoop
    .on(
      'workflowStarted',
      safeCall(async () => workflowReporter.workflowStarted()),
    )
    .on(
      'workflowPaused',
      safeCall(async () => workflowReporter.workflowPaused()),
    )
    .on(
      'workflowResuming',
      safeCall(async () => workflowReporter.workflowResuming()),
    )
    .on(
      'workflowPassed',
      safeCall(async () => workflowReporter.workflowPassed()),
    )
    .on(
      'workflowFailed',
      safeCall(async (...params) => workflowReporter.workflowFailed(...params)),
    );

  // Run one iteration of the workflowLoop
  const decision = await workflowLoop.tick();

  // Deal with the output
  if (_.isEmpty(decision)) {
    throw new Error("The workflow loop tick() method didn't return a decision object");
  }
  const output = {
    shouldWait: 0,
    shouldLoop: 0,
    shouldPass: 0,
    shouldFail: 0,
    memento: workflowLoop.getMemento(),
    slp: stepLoopProvider.getMemento(),
    wp: workflowPayload.getMemento(),
    ssp: stepStateProvider.getMemento(),
  };

  if (!sidFromMeta) output.sid = sid;

  switch (decision.type) {
    case 'loop':
      output.shouldLoop = 1;
      break;
    case 'wait':
    case 'pause':
      output.shouldWait = 1;
      output.wait = decision.wait;
      break;
    case 'pass':
      output.shouldPass = 1;
      break;
    case 'fail':
      output.shouldFail = 1;
      output.error = _.omit(decision, ['type']);
      break;
    default:
      throw new Error(`The workflow loop tick() method returned unsupported decision type of "${decision.type}"`);
  }

  return output;
}