const ScheduleNode:()

in packages/extensions/core/src/lib/pipeline/pipeline.ts [98:223]


  const ScheduleNode: (nodeName: string) => Promise<DataSource> = async (nodeName) => {
    const node = pipeline.pipeline[nodeName];
    if (!node) {
      throw new Error(`Cannot find pipeline node ${nodeName}.`);
    }

    // get input
    // eslint-disable-next-line @typescript-eslint/no-use-before-define
    const inputScopes: Array<DataSource> = await Promise.all(node.inputs.map(getTask));

    let inputScope: DataSource;
    switch (inputScopes.length) {
      case 0:
        inputScope = fsInput;
        break;
      case 1:
        inputScope = await inputScopes[0];
        break;
      default:
        {
          let pipeState: PipeState = {};

          const handles: Array<DataHandle> = [];
          for (const pscope of inputScopes) {
            const scope = await pscope;
            pipeState = mergePipeStates(pipeState, scope.pipeState);
            for (const handle of await scope.Enum()) {
              handles.push(await scope.readStrict(handle));
            }
          }
          inputScope = new QuickDataSource(handles, pipeState);
        }
        break;
    }

    const context = pipeline.configs[serializeJsonPointer(node.configScope)];
    const pluginName = node.pluginName;

    // you can have --pass-thru:FOO on the command line
    // or add pass-thru: true in a pipline configuration step.
    // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
    const configEntry = context.GetEntry(last(node.configScope)!.toString());
    const passthru =
      configEntry?.["pass-thru"] === true || configView.config["pass-thru"]?.find((x) => x === pluginName);
    const usenull =
      configEntry?.["null"] === true || configView.GetEntry("null")?.find((x: string) => x === pluginName);

    const plugin = usenull
      ? CORE_PLUGIN_MAP.null
      : passthru
        ? CORE_PLUGIN_MAP.identity
        : pluginName === "pipeline-emitter"
          ? pipelineEmitterPlugin
          : plugins[pluginName]?.plugin;

    if (!plugin) {
      throw new Error(`Plugin '${pluginName}' not found.`);
    }

    if (inputScope.skip) {
      context.debug(`${nodeName} - SKIPPING`);
      return inputScope;
    }
    let cacheKey: string | undefined;

    if (context.config.cachingEnabled) {
      // generate the key used to store/access cached content
      const names = await inputScope.Enum();
      const data = (
        await Promise.all(names.map((name) => inputScope.readStrict(name).then((uri) => md5(uri.readData()))))
      ).sort();

      cacheKey = md5([context.configFileFolderUri, nodeName, ...data].join("«"));
    }

    // if caching is enabled, see if we can find a scopeResult in the cache first.
    // key = inputScope names + md5(inputScope content)
    if (
      context.config.cachingEnabled &&
      inputScope.cachable &&
      context.config.cacheExclude.indexOf(nodeName) === -1 &&
      (await isCached(cacheKey))
    ) {
      // shortcut -- get the outputs directly from the cache.
      context.log({
        level: times ? "information" : "debug",
        message: `${nodeName} - CACHED inputs = ${(await inputScope.enum()).length} [0.0 s]`,
      });

      return await readCache(cacheKey, context.DataStore.getDataSink(node.outputArtifact));
    }

    const t1 = process.uptime() * 100;
    context.log({
      level: times ? "information" : "debug",
      message: `${nodeName} - START inputs = ${(await inputScope.enum()).length}`,
    });

    // creates the actual plugin.
    const scopeResult = await plugin(context, inputScope, context.DataStore.getDataSink(node.outputArtifact));
    const t2 = process.uptime() * 100;

    const memSuffix = context.config.debug ? `[${Math.round(process.memoryUsage().heapUsed / 1024 / 1024)} MB]` : "";
    context.log({
      level: times ? "information" : "debug",
      message: `${nodeName} - END [${Math.floor(t2 - t1) / 100} s]${memSuffix}`,
    });

    // if caching is enabled, let's cache this scopeResult.
    if (context.config.cachingEnabled && cacheKey) {
      await writeCache(cacheKey, scopeResult);
    }
    // if this node wasn't able to load from the cache, then subsequent nodes shall not either
    if (!inputScope.cachable || context.config.cacheExclude.indexOf(nodeName) !== -1) {
      try {
        scopeResult.cachable = false;
      } catch {
        // not settable on fs inputs anyway.
      }
    }

    // Yield the event loop.
    await setImmediatePromise();

    return scopeResult;
  };