in packages/extensions/core/src/lib/pipeline/pipeline.ts [98:223]
const ScheduleNode: (nodeName: string) => Promise<DataSource> = async (nodeName) => {
const node = pipeline.pipeline[nodeName];
if (!node) {
throw new Error(`Cannot find pipeline node ${nodeName}.`);
}
// get input
// eslint-disable-next-line @typescript-eslint/no-use-before-define
const inputScopes: Array<DataSource> = await Promise.all(node.inputs.map(getTask));
let inputScope: DataSource;
switch (inputScopes.length) {
case 0:
inputScope = fsInput;
break;
case 1:
inputScope = await inputScopes[0];
break;
default:
{
let pipeState: PipeState = {};
const handles: Array<DataHandle> = [];
for (const pscope of inputScopes) {
const scope = await pscope;
pipeState = mergePipeStates(pipeState, scope.pipeState);
for (const handle of await scope.Enum()) {
handles.push(await scope.readStrict(handle));
}
}
inputScope = new QuickDataSource(handles, pipeState);
}
break;
}
const context = pipeline.configs[serializeJsonPointer(node.configScope)];
const pluginName = node.pluginName;
// you can have --pass-thru:FOO on the command line
// or add pass-thru: true in a pipline configuration step.
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const configEntry = context.GetEntry(last(node.configScope)!.toString());
const passthru =
configEntry?.["pass-thru"] === true || configView.config["pass-thru"]?.find((x) => x === pluginName);
const usenull =
configEntry?.["null"] === true || configView.GetEntry("null")?.find((x: string) => x === pluginName);
const plugin = usenull
? CORE_PLUGIN_MAP.null
: passthru
? CORE_PLUGIN_MAP.identity
: pluginName === "pipeline-emitter"
? pipelineEmitterPlugin
: plugins[pluginName]?.plugin;
if (!plugin) {
throw new Error(`Plugin '${pluginName}' not found.`);
}
if (inputScope.skip) {
context.debug(`${nodeName} - SKIPPING`);
return inputScope;
}
let cacheKey: string | undefined;
if (context.config.cachingEnabled) {
// generate the key used to store/access cached content
const names = await inputScope.Enum();
const data = (
await Promise.all(names.map((name) => inputScope.readStrict(name).then((uri) => md5(uri.readData()))))
).sort();
cacheKey = md5([context.configFileFolderUri, nodeName, ...data].join("«"));
}
// if caching is enabled, see if we can find a scopeResult in the cache first.
// key = inputScope names + md5(inputScope content)
if (
context.config.cachingEnabled &&
inputScope.cachable &&
context.config.cacheExclude.indexOf(nodeName) === -1 &&
(await isCached(cacheKey))
) {
// shortcut -- get the outputs directly from the cache.
context.log({
level: times ? "information" : "debug",
message: `${nodeName} - CACHED inputs = ${(await inputScope.enum()).length} [0.0 s]`,
});
return await readCache(cacheKey, context.DataStore.getDataSink(node.outputArtifact));
}
const t1 = process.uptime() * 100;
context.log({
level: times ? "information" : "debug",
message: `${nodeName} - START inputs = ${(await inputScope.enum()).length}`,
});
// creates the actual plugin.
const scopeResult = await plugin(context, inputScope, context.DataStore.getDataSink(node.outputArtifact));
const t2 = process.uptime() * 100;
const memSuffix = context.config.debug ? `[${Math.round(process.memoryUsage().heapUsed / 1024 / 1024)} MB]` : "";
context.log({
level: times ? "information" : "debug",
message: `${nodeName} - END [${Math.floor(t2 - t1) / 100} s]${memSuffix}`,
});
// if caching is enabled, let's cache this scopeResult.
if (context.config.cachingEnabled && cacheKey) {
await writeCache(cacheKey, scopeResult);
}
// if this node wasn't able to load from the cache, then subsequent nodes shall not either
if (!inputScope.cachable || context.config.cacheExclude.indexOf(nodeName) !== -1) {
try {
scopeResult.cachable = false;
} catch {
// not settable on fs inputs anyway.
}
}
// Yield the event loop.
await setImmediatePromise();
return scopeResult;
};