packages/cli/src/runtime.ts (103 lines of code) (raw):
import * as fs from 'fs-extra';
import { PipelineMeta, Costa, PipelineWorkSpace, ScriptConfig } from '@pipcook/costa';
import { TaskType, PredictResult, DatasetPool } from '@pipcook/core';
import * as path from 'path';
import { createStandaloneRT } from './standalone-impl';
import { logger, Framework, Plugin, Script } from './utils';
import * as constants from './constants';
/**
* Standalone runtime construct options.
*/
export interface Options {
// workspace directory, should be absolute
workspace: string;
// pipeline metadata
pipelineMeta: PipelineMeta;
// framework mirror base url
mirror: string;
// cache enabled flag, if true, pipcook find framework in cache first before fetching from remote.
enableCache: boolean;
// artifact plugins are installed by npm client, this option is used to specific it.
npmClient: string;
// npm registry
registry?: string;
// development mode flag, if true, pipcook run all scripts in the original path, without copying them into the workspace, so you can debug them with node-debugger.
devMode: boolean;
}
/**
* Runtime for standalone environment,
* input pipeline configuration file, run the pipeline.
*/
export class StandaloneRuntime {
// workspace for pipeline
private workspace: PipelineWorkSpace;
// script directory
private scriptDir: string;
// instance of Costa, the script runner
private costa: Costa;
// artiface plugin list
private artifactPlugins: Plugin.ArtifactMeta[];
// scripts in the pipeline
private scripts: ScriptConfig;
// original pipeline metadata
private pipelineMeta: PipelineMeta;
// framework mirror
private mirror: string;
// cache enable flag
private enableCache: boolean;
// npm client for artifact plugin installation
private npmClient: string;
// npm registry for artifact plugin installation
private registry: string | undefined;
// development mode flag
private devMode: boolean;
/**
* Standalone runtime constructor.
* @param opts options to make a standalone runtime.
*/
constructor(opts: Options) {
this.pipelineMeta = opts.pipelineMeta;
this.mirror = opts.mirror;
this.enableCache = opts.enableCache;
this.registry = opts.registry;
this.npmClient = opts.npmClient;
this.devMode = opts.devMode;
this.scriptDir = path.join(opts.workspace, constants.WorkspaceScriptDir);
this.workspace = {
dataDir: path.join(opts.workspace, constants.WorkspaceDataDir),
modelDir: path.join(opts.workspace, constants.WorkspaceModelDir),
cacheDir: path.join(opts.workspace, constants.WorkspaceCacheDir),
frameworkDir: path.join(opts.workspace, constants.WorkspaceFrameworkDir)
};
}
/**
* Make directories in workspace, exclude framework, it should be link to the cache.
*/
private async prepareWorkspace(): Promise<void> {
const futures = [
fs.mkdirp(this.workspace.cacheDir),
fs.mkdirp(this.workspace.dataDir),
fs.mkdirp(this.workspace.modelDir),
fs.mkdirp(this.scriptDir)
];
await Promise.all(futures);
}
/**
* Prepare workspace for pipeline running.
* @param artifactPlugin if prepare artifact plugin, it's necessary for train but not for predict.
*/
async prepare(artifactPlugin = true): Promise<void> {
await this.prepareWorkspace();
logger.info('preparing framework');
const framework = await Framework.prepareFramework(this.pipelineMeta, this.workspace.frameworkDir, this.mirror, this.enableCache);
logger.info('preparing scripts');
this.scripts = await Script.prepareScript(this.pipelineMeta, this.scriptDir, this.enableCache, this.devMode);
logger.info('preparing artifact plugins');
if (artifactPlugin) {
this.artifactPlugins = await Plugin.prepareArtifactPlugin(this.pipelineMeta, this.npmClient, this.registry);
}
this.costa = new Costa({
workspace: this.workspace,
framework
});
logger.info('initializing framework packages');
await this.costa.initFramework(this.pipelineMeta.options);
const modulePath = path.join(this.workspace.frameworkDir, constants.JSModuleDirName);
const scriptModulePath = path.join(this.scriptDir, constants.JSModuleDirName);
if (await fs.pathExists(modulePath) && !await fs.pathExists(scriptModulePath)) {
// link node_module in framework to script directory
await fs.ensureSymlink(modulePath, scriptModulePath);
}
// link @pipcook/core to node_module
await Script.linkCoreToScript(scriptModulePath);
}
/**
* Train from pipeline, it shoul be called after `prepare`.
*/
async train(): Promise<void> {
logger.info('running data source script');
let datasource = await this.costa.runDataSource(this.scripts.datasource);
logger.info('running data flow script');
if (this.scripts.dataflow) {
datasource = await this.costa.runDataflow(datasource, this.scripts.dataflow);
}
logger.info('running model script');
const standaloneRT = createStandaloneRT(datasource, this.workspace.modelDir);
await this.costa.runModel(standaloneRT, this.scripts.model, this.pipelineMeta.options);
await fs.writeJson(path.join(this.workspace.modelDir, constants.PipelineFileInModelDir), this.pipelineMeta);
logger.info(`pipeline finished, the model has been saved at ${this.workspace.modelDir}`);
for (const artifact of this.artifactPlugins) {
logger.info(`running artifact ${artifact.options.processor}`);
await artifact.artifactExports.build(this.workspace.modelDir, artifact.options);
logger.info('done');
}
}
/**
* Predict from pipeline, it should be called after `prepare`.
* @param datasource input for predict
* @returns predict result
*/
async predict(datasource: DatasetPool.Types.DatasetPool<any, any>): Promise<PredictResult> {
logger.info('running data flow script');
if (this.scripts.dataflow) {
datasource = await this.costa.runDataflow(datasource, this.scripts.dataflow, TaskType.PREDICT);
}
logger.info('running model script');
const standaloneRT = createStandaloneRT(datasource, this.workspace.modelDir);
return this.costa.runModel(standaloneRT, this.scripts.model, this.pipelineMeta.options, TaskType.PREDICT);
}
}