core/pipeline.ts (216 lines of code) (raw):

/* * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ import { IConfiguredWrapper, IWrapper } from "./ext-types.js"; import pipelineConfigDefaults from "./pipeline-defaults.js"; import { IBuiltStage, IExecutePipelineConfig, IExecutionContext, IExecutionResult, IPipelineConfig, IPipelineContext, IStage, ISynchronizerConfig, IValidationResult } from "./pipeline-types.js" export const buildStage = (stage: IStage) : IBuiltStage => { const executeStage = async function (pipelineConfig: IPipelineConfig, pipelineContext: IPipelineContext, executionContext: IExecutionContext) { const executionResult: IExecutionResult = { builtStage: this, isValidationSuccess: true, isExecutionSuccess: true, pendingValidators: [], children: [], } const generator = stage.generator.makeInstance(); const millisecondsPerBatch = 1000 / stage.config.batchRate; let startTime = Date.now(); let batchStartTime = Date.now(); let batchIndex = 0; for await (const batch of generator) { if (batchIndex === stage.config.batchLimit) { break; } const execSuccess = await stage.datajet.transmitBatch(batch) executionResult.isExecutionSuccess &&= execSuccess; pipelineContext.isExecutionSuccess &&= execSuccess; const now = Date.now(); const deltaTime = now - batchStartTime; const remainingTime = millisecondsPerBatch - deltaTime; await delay(remainingTime); ++batchIndex; if ((now - startTime) > stage.config.timeLimit * 1000) { break; } batchStartTime = Date.now(); } return executionResult; }; const builtStage: IBuiltStage = { executeStage: executeStage, children: [], stageLeaf: stage, type: "stage", }; // Bind execute stage builtStage.executeStage = builtStage.executeStage.bind(builtStage); return builtStage; } export const synchronizer = ({stages, config} : {stages: Array<IBuiltStage>, config: ISynchronizerConfig}) : IBuiltStage => { const executeStage = async function (pipelineConfig: IPipelineConfig, pipelineContext: IPipelineContext, executionContext: IExecutionContext) { const executionResult: IExecutionResult = { builtStage: this, isValidationSuccess: true, isExecutionSuccess: true, pendingValidators: [], children: [], } const pendingValidators: Array<Promise<IValidationResult>> = []; // process the pending validators according to synchronizer's configuration const processPendingValidators = async () => { // wait for async validators to complete (along with breakdown) if (config.awaitAsyncValidators) { // await all pending validators await Promise.all(pendingValidators); } // punt async validators to parent else { executionResult.pendingValidators = pendingValidators; } } for (let r = 0; r < config.repeat; ++r) { await delay(config.waitBefore * 1000); let executionResults: Array<IExecutionResult> = []; // execute children asynchronously if (config.isAsync) { executionResults = await Promise.all( stages.map(stage => stage.executeStage(pipelineConfig, pipelineContext, executionContext)) ); // summarize execution executionResult.isValidationSuccess &&= executionResults.every(result => result.isValidationSuccess); executionResult.isExecutionSuccess &&= executionResults.every(result => result.isExecutionSuccess); } // execute children synchronously else { for (let i = 0; i < stages.length; ++i) { executionResults.push(await stages[i].executeStage(pipelineConfig, pipelineContext, executionContext)); const execSuccess = executionResults[executionResults.length-1].isExecutionSuccess !== false; const valiSuccess = executionResults[executionResults.length-1].isValidationSuccess !== false; pipelineContext.isExecutionSuccess &&= execSuccess; /* toggle global success flags */ pipelineContext.isValidationSuccess &&= valiSuccess; executionResult.isExecutionSuccess &&= execSuccess; /* toggle local success flags */ executionResult.isValidationSuccess &&= valiSuccess; let earlyReturn = !execSuccess && pipelineConfig.executionFailureAction === "terminate"; earlyReturn &&= !valiSuccess && pipelineConfig.validationFailureAction === "terminate"; if (earlyReturn) { break; } if (i !== stages.length - 1) { await delay(config.waitBetween); } } } // collect pending validators & execution results executionResult.pendingValidators.push(...executionResults.flatMap(results => results.pendingValidators)); executionResult.children.push(...executionResults); /* repeats will have a node fan out. [a, b, a, b, ...] */ /* we may run out of memory for canary tests. add new option to not save results */ let earlyReturn = !executionResult.isExecutionSuccess && pipelineConfig.executionFailureAction === "terminate"; earlyReturn &&= !executionResult.isValidationSuccess && pipelineConfig.validationFailureAction === "terminate"; if (earlyReturn) { break; } await delay(config.waitAfter * 1000); } // process validators and return result await processPendingValidators(); return executionResult; }; const builtStage: IBuiltStage = { executeStage: executeStage, children: stages, synchronizerConfig: config, type: "synchronizer", }; // Bind execute stage builtStage.executeStage = builtStage.executeStage.bind(builtStage); return builtStage; } export const wrapWith = (wrapperConstructor: (executionContext: IExecutionContext, wrapperTemplate: IWrapper, wrapperConfig: any) => IConfiguredWrapper, wrapperTemplate: IWrapper, wrapperConfig: any, builtChild: IBuiltStage) => { const executeStage = async function (pipelineConfig: IPipelineConfig, pipelineContext: IPipelineContext, executionContext: IExecutionContext) { const executionResult: IExecutionResult = { builtStage: this, isValidationSuccess: true, isExecutionSuccess: true, pendingValidators: [], children: [], } // configure managed variables isolated from parent variables let isManagedVariablesMutable = true; const deepCopiedManagedVariables = JSON.parse(JSON.stringify(executionContext.managedVariables)); const setManagedVariable = (key: string, value: any) => { if (isManagedVariablesMutable) { deepCopiedManagedVariables[key] = value; } else { throw "Attempting to set managed variables after setup. Managed variables are immutable at this point." } } const isolatedExecutionContext: IExecutionContext = { ...executionContext, managedVariables: deepCopiedManagedVariables, setManagedVariable: setManagedVariable } // construct wrapper const wrapper = wrapperConstructor(isolatedExecutionContext, wrapperTemplate, wrapperConfig); // setup executionResult.isExecutionSuccess &&= await wrapper.setup(pipelineConfig.rootBuiltStage, builtChild); // freeze managedVariables isManagedVariablesMutable = false; // execute const childExecutionResult = await builtChild.executeStage(pipelineConfig, pipelineContext, isolatedExecutionContext); executionResult.children = [childExecutionResult]; executionResult.pendingValidators = childExecutionResult?.pendingValidators; /* punt all child validators to parent */ executionResult.isExecutionSuccess &&= childExecutionResult.isExecutionSuccess; executionResult.isValidationSuccess &&= childExecutionResult.isValidationSuccess; // validate + teardown const validate = wrapper.validation(pipelineConfig.rootBuiltStage, builtChild) .then(async result => { executionResult.validationResult = result; executionResult.isValidationSuccess &&= result.isValidationSuccess; pipelineContext.isValidationSuccess &&= result.isValidationSuccess; // teardown executionResult.isExecutionSuccess &&= await wrapper.breakdown(pipelineConfig.rootBuiltStage, builtChild); pipelineContext.isExecutionSuccess &&= executionResult.isExecutionSuccess; return result; }); if (wrapper?.isValidationAsync ?? false) { executionResult.pendingValidators.push(validate); } else { await validate; } // propagate false flags pipelineContext.isExecutionSuccess &&= executionResult.isExecutionSuccess; pipelineContext.isValidationSuccess &&= executionResult.isValidationSuccess; return executionResult; }; const outBuiltStage: IBuiltStage = { executeStage: executeStage, children: [builtChild], wrapperConfig: wrapperConfig, type: "wrapper", component: wrapperTemplate.name, } // Bind execute stage outBuiltStage.executeStage = outBuiltStage.executeStage.bind(outBuiltStage); return outBuiltStage; } /* * Pipeline may be deep-freezed to ensure that executePipeline * maintains it's immutability property */ export const deepFreezePipeline = (pipelineRoot: Object) => { // Retrieve the property names defined on object const propNames = Object.getOwnPropertyNames(pipelineRoot); // Freeze properties before freezing self for (const name of propNames) { const value = pipelineRoot[name]; if (value && typeof value === "object") { deepFreezePipeline(value); } } return Object.freeze(pipelineRoot); } export const executePipeline = async (pipelineRoot: IBuiltStage, executePipelineConfig: IExecutePipelineConfig) => { const pipelineConfig: IPipelineConfig = { rootBuiltStage: pipelineRoot, ...pipelineConfigDefaults, ...executePipelineConfig, } const startTime = Date.now(); console.log (`Executing pipeline`); const pipelineContext: IPipelineContext = { isValidationSuccess: true, isExecutionSuccess: true, } const executionContext: IExecutionContext = { managedVariables: {}, setManagedVariable: (key: string, value: any) => {} } const executionResult: IExecutionResult = await pipelineRoot.executeStage(pipelineConfig, pipelineContext, executionContext); await Promise.all(executionResult.pendingValidators); /* evaluate pending validators */ // console.log(`Execution results: ${JSON.stringify(executionResult, null, 2)}`); console.log(`Elapsed time: ${(Date.now() - startTime) / 1000} seconds`); console.log(`Execution results:`); console.log("Execution success: ", pipelineContext.isExecutionSuccess); console.log("Validation success: ", pipelineContext.isValidationSuccess); console.log("Executed pipeline."); return executionResult; } function delay(milliseconds: number) : Promise<null> { return new Promise(resolve => { setTimeout(() => { resolve(null) }, milliseconds); }) }