in packages/extensions/core/src/lib/pipeline/pipeline.ts [46:301]
export async function runPipeline(configView: AutorestContext, fileSystem: IFileSystem): Promise<void> {
const plugins = await loadPlugins(configView);
const __extensionExtension = mapValues(
omitBy(plugins, (x) => x.builtIn),
(x) => x.extension,
);
// __status scope
const startTime = Date.now();
configView.config.raw.__status = new Proxy<any>(
{},
{
get(_, key) {
if (key === "__info") {
return false;
}
const expr = Buffer.from(key.toString(), "base64").toString("ascii");
try {
return JSON.stringify(
safeEval(expr, {
pipeline: pipeline.pipeline,
external: __extensionExtension,
tasks,
startTime,
blame: (uri: string, position: any /*TODO: cleanup, nail type*/) => {
return configView.DataStore.blame(uri, position);
},
}),
(k, v) => (k === "dependencies" ? undefined : v),
2,
);
} catch (e) {
return `${e}`;
}
},
},
);
// TODO: think about adding "number of files in scope" kind of validation in between pipeline steps
const fsInput = configView.DataStore.getReadThroughScope(fileSystem);
const pipeline = buildPipeline(configView, plugins);
const times = !!configView.config["timestamp"];
const tasks: { [name: string]: Promise<DataSource> } = {};
const pipelineEmitterPlugin = createArtifactEmitterPlugin(
async (context) =>
new QuickDataSource([
await context.DataStore.getDataSink().writeObject("pipeline", pipeline.pipeline, ["fix-me-3"], "pipeline"),
]),
);
const ScheduleNode: (nodeName: string) => Promise<DataSource> = async (nodeName) => {
const node = pipeline.pipeline[nodeName];
if (!node) {
throw new Error(`Cannot find pipeline node ${nodeName}.`);
}
// get input
// eslint-disable-next-line @typescript-eslint/no-use-before-define
const inputScopes: Array<DataSource> = await Promise.all(node.inputs.map(getTask));
let inputScope: DataSource;
switch (inputScopes.length) {
case 0:
inputScope = fsInput;
break;
case 1:
inputScope = await inputScopes[0];
break;
default:
{
let pipeState: PipeState = {};
const handles: Array<DataHandle> = [];
for (const pscope of inputScopes) {
const scope = await pscope;
pipeState = mergePipeStates(pipeState, scope.pipeState);
for (const handle of await scope.Enum()) {
handles.push(await scope.readStrict(handle));
}
}
inputScope = new QuickDataSource(handles, pipeState);
}
break;
}
const context = pipeline.configs[serializeJsonPointer(node.configScope)];
const pluginName = node.pluginName;
// you can have --pass-thru:FOO on the command line
// or add pass-thru: true in a pipline configuration step.
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const configEntry = context.GetEntry(last(node.configScope)!.toString());
const passthru =
configEntry?.["pass-thru"] === true || configView.config["pass-thru"]?.find((x) => x === pluginName);
const usenull =
configEntry?.["null"] === true || configView.GetEntry("null")?.find((x: string) => x === pluginName);
const plugin = usenull
? CORE_PLUGIN_MAP.null
: passthru
? CORE_PLUGIN_MAP.identity
: pluginName === "pipeline-emitter"
? pipelineEmitterPlugin
: plugins[pluginName]?.plugin;
if (!plugin) {
throw new Error(`Plugin '${pluginName}' not found.`);
}
if (inputScope.skip) {
context.debug(`${nodeName} - SKIPPING`);
return inputScope;
}
let cacheKey: string | undefined;
if (context.config.cachingEnabled) {
// generate the key used to store/access cached content
const names = await inputScope.Enum();
const data = (
await Promise.all(names.map((name) => inputScope.readStrict(name).then((uri) => md5(uri.readData()))))
).sort();
cacheKey = md5([context.configFileFolderUri, nodeName, ...data].join("«"));
}
// if caching is enabled, see if we can find a scopeResult in the cache first.
// key = inputScope names + md5(inputScope content)
if (
context.config.cachingEnabled &&
inputScope.cachable &&
context.config.cacheExclude.indexOf(nodeName) === -1 &&
(await isCached(cacheKey))
) {
// shortcut -- get the outputs directly from the cache.
context.log({
level: times ? "information" : "debug",
message: `${nodeName} - CACHED inputs = ${(await inputScope.enum()).length} [0.0 s]`,
});
return await readCache(cacheKey, context.DataStore.getDataSink(node.outputArtifact));
}
const t1 = process.uptime() * 100;
context.log({
level: times ? "information" : "debug",
message: `${nodeName} - START inputs = ${(await inputScope.enum()).length}`,
});
// creates the actual plugin.
const scopeResult = await plugin(context, inputScope, context.DataStore.getDataSink(node.outputArtifact));
const t2 = process.uptime() * 100;
const memSuffix = context.config.debug ? `[${Math.round(process.memoryUsage().heapUsed / 1024 / 1024)} MB]` : "";
context.log({
level: times ? "information" : "debug",
message: `${nodeName} - END [${Math.floor(t2 - t1) / 100} s]${memSuffix}`,
});
// if caching is enabled, let's cache this scopeResult.
if (context.config.cachingEnabled && cacheKey) {
await writeCache(cacheKey, scopeResult);
}
// if this node wasn't able to load from the cache, then subsequent nodes shall not either
if (!inputScope.cachable || context.config.cacheExclude.indexOf(nodeName) !== -1) {
try {
scopeResult.cachable = false;
} catch {
// not settable on fs inputs anyway.
}
}
// Yield the event loop.
await setImmediatePromise();
return scopeResult;
};
// schedule pipeline
const getTask = (name: string) => (name in tasks ? tasks[name] : (tasks[name] = ScheduleNode(name)));
// execute pipeline
const barrier = new OutstandingTaskAwaiter();
const barrierRobust = new OutstandingTaskAwaiter();
for (const name of Object.keys(pipeline.pipeline)) {
const node = pipeline.pipeline[name];
node.dependencies = new Array<PipelineNode>();
// find nodes that list this as a antecedent
for (const k of Object.keys(pipeline.pipeline)) {
// does anyone take this as an input?
const candidate = pipeline.pipeline[k];
if (candidate.inputs.indexOf(name)) {
node.dependencies.push(candidate);
}
}
}
for (const name of Object.keys(pipeline.pipeline)) {
// walk thru the list of nodes, and if a given node is skipable beacuse nobody is consuming it
// we'll mark it skip: true
const node = pipeline.pipeline[name];
if (isDrainRequired(node)) {
node.skip = true;
}
}
/*
we should be able to look at all the tasks,
recursively find out who the children are of a given task
and then find out if they all have requireDrain === false
and f
for (const name of Object.keys(pipeline.pipeline)) {
const node = pipeline.pipeline[name];
if (node.requireDrain === true) {
for (const k of Object.keys(pipeline.pipeline) ) {
// does anyone take this as an input?
const candidate= pipeline.pipeline[k];
if( candidate.inputs.indexOf(name) )
}
}
}
*/
for (const name of Object.keys(pipeline.pipeline)) {
const task = getTask(name);
const taskx: { _state: "running" | "failed" | "complete"; _result(): Array<DataHandle>; _finishedAt: number } = <
any
>task;
taskx._state = "running";
task
.then(async (x) => {
const res = await Promise.all((await x.Enum()).map((key) => x.ReadStrict(key)));
taskx._result = () => res;
taskx._state = "complete";
taskx._finishedAt = Date.now();
})
.catch(() => (taskx._state = "failed"));
barrier.await(task);
barrierRobust.await(task.catch(() => {}));
}
try {
await barrier.wait();
await emitStats(configView);
} catch (e) {
// wait for outstanding nodes
try {
await barrierRobust.wait();
} catch {
// wait for others to fail or whatever...
}
throw e;
}
}