core/rockefeller.ts (159 lines of code) (raw):
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
import { IBuiltStage, IExecutionContext, IPipelineConfig, IPipelineContext } from "./pipeline-types.js";
import generatorTemplates from "../generators/generator-index.js"
import wrapperTemplates from "../wrappers/wrapper-index.js"
import datajetTemplates from "../datajets/datajet-index.js"
import { buildStage, synchronizer, wrapWith } from "./pipeline.js";
import { error } from "console";
import { IComponentDependencies, IConfiguredGenerator, IWrapper } from "./ext-types.js";
import { initDependencies } from "./component-dependencies.js";
import { synchronizerConfigDefaults } from "./pipeline-defaults.js";
export interface IPipelineSchema {
component?: string,
referenceId?: string,
library?: IPipelineSchema[],
definitions?: {[key: string]: any},
config?: any,
children?: IPipelineSchema[],
child?: IPipelineSchema,
generator: any,
datajet: any,
stage: any, /* remove and replace with config */
};
enum IComponentName {
Stage = "stage", /* default */
Synchronizer = "synchronizer",
Validator = "validator",
Generator = "generator",
/* all wrappers are also components */
}
const genericBuiltStageStub = {
children: [],
executeStage: (a: IPipelineConfig, b: IPipelineContext, c: IExecutionContext) => (Promise.resolve({builtStage: this,
isValidationSuccess: true,
isExecutionSuccess: true,
pendingValidators: [],
children: [],})),
};
export function buildPipeline(buildSchema: IPipelineSchema, componentDependencies?: IComponentDependencies) : IBuiltStage {
if (arguments.length === 1) {
const dependencies = initDependencies(buildSchema);
dependencies.logger.info(`testUUID: ${dependencies.variables.managed.testUUID}`);
return buildPipeline(buildSchema, dependencies);
}
const derivedDependencies = deriveDependencies(buildSchema, componentDependencies);
if (!buildSchema.component || buildSchema.component === IComponentName.Stage) {
return buildPipelineStage(buildSchema, derivedDependencies);
}
else if (buildSchema.component === IComponentName.Synchronizer) {
return buildPipelineSynchronizer(buildSchema, derivedDependencies);
}
else if (buildSchema.component === IComponentName.Generator) {
return buildPipelineGenerator(buildSchema, derivedDependencies);
}
else if (wrapperTemplates.some((w)=>(buildSchema.component === w.name))) {
return buildPipelineWrapper(buildSchema, derivedDependencies);
}
throw error("Unsupported pipeline component in schema: ", buildSchema.component);
}
function deriveDependencies(buildSchema: IPipelineSchema, componentDependencies: IComponentDependencies) : IComponentDependencies {
const libraryComponents: {[key: string]: IBuiltStage} = {};
(buildSchema.library ?? []).forEach(libBuildSchema => {
libraryComponents[libBuildSchema.referenceId] = buildPipeline(libBuildSchema, componentDependencies);
});
const definitions: {[key: string]: any} = {
...componentDependencies.variables.defined,
...(buildSchema.definitions ?? {}),
};
return {
...componentDependencies,
library: {
...componentDependencies.library,
...libraryComponents,
},
variables: {
...componentDependencies.variables,
defined: {
...componentDependencies.variables.defined,
...(buildSchema.definitions ?? {}),
}
},
localPipelineSchema: buildSchema,
}
}
function buildPipelineStage(buildSchema: IPipelineSchema, componentDependencies: IComponentDependencies) : IBuiltStage {
const findGeneratorByName = (name: string) => generatorTemplates
.find((generatorTemplate => generatorTemplate.name === name));
const findDatajetByName = (name: string) => datajetTemplates
.find((datajetTemplate => datajetTemplate.name === name));
const stage = {
generator: findGeneratorByName(buildSchema.generator.name)
.createConfiguredGenerator({
...findGeneratorByName(buildSchema.generator.name).defaultConfig,
...buildSchema.generator.config ?? {},
}, componentDependencies),
datajet: findDatajetByName(buildSchema.datajet.name)
.createConfiguredDatajet({
...findDatajetByName(buildSchema.datajet.name).defaultConfig,
...buildSchema.datajet.config ?? {},
}, componentDependencies),
config: buildSchema.stage,
}
return buildStage(stage);
}
function buildPipelineSynchronizer(buildSchema: IPipelineSchema, componentDependencies: IComponentDependencies) : IBuiltStage {
const builtStages: Array<IBuiltStage> = (buildSchema.children ?? [])
.map(childSchema => {
return buildPipeline(childSchema, componentDependencies);
});
return synchronizer({
stages: builtStages,
config: {
...synchronizerConfigDefaults,
...(buildSchema.config ?? {}),
}
});
}
function buildPipelineGenerator(buildSchema: IPipelineSchema, componentDependencies: IComponentDependencies) : IBuiltStage {
return {
...genericBuiltStageStub,
type: "generator",
data: buildGenerator(buildSchema, componentDependencies),
}
}
function buildGenerator(buildSchema: IPipelineSchema, componentDependencies: IComponentDependencies) : IConfiguredGenerator {
const findGeneratorByName = (name: string) => generatorTemplates
.find((generatorTemplate => generatorTemplate.name === name));
return findGeneratorByName(buildSchema.generator.name)
.createConfiguredGenerator({
...findGeneratorByName(buildSchema.generator.name).defaultConfig,
...buildSchema.generator.config ?? {},
}, componentDependencies);
}
function buildPipelineWrapper(buildSchema: IPipelineSchema, componentDependencies: IComponentDependencies) : IBuiltStage {
const wrapperConfig = buildSchema.config;
const wrapperTemplate = wrapperTemplates.find((wrapperTemplate => wrapperTemplate.name === buildSchema.component));
/* Modify subschema */
const subschema = buildSchema.child;
const modifiedSubschema = wrapperTemplate.modifySubschema({...subschema}); /* future: add deep copy utility */
/* Wrapper is constructed at the start of each execution, build dependencies stored in function closure */
const wrapperConstructor = (executionContext: IExecutionContext, wrapperTemplate: IWrapper, wrapperConfig: any) => {
const combinedManagedVariables = {
...componentDependencies.variables.managed,
...executionContext.managedVariables,
}
/* Update both managed variables at the execution context level, and wrapper level */
const setAllManagedVariables = (key: string, value: any) => {
combinedManagedVariables[key] = value;
executionContext.setManagedVariable(key, value);
}
/* Include the managed variables from this point in the execution. */
return wrapperTemplate.createConfiguredWrapper({
...wrapperTemplate.defaultConfig,
...wrapperConfig ?? {},
}, {
...componentDependencies,
variables: {
...componentDependencies.variables,
managed: combinedManagedVariables,
},
setManagedVariable: setAllManagedVariables
});
}
const builtChild = buildPipeline(modifiedSubschema, componentDependencies);
return wrapWith(wrapperConstructor, wrapperTemplate, wrapperConfig, builtChild);
}