packages/@aws-cdk/toolkit-lib/lib/api/work-graph/work-graph.ts (310 lines of code) (raw):
import type { WorkNode, StackNode, AssetBuildNode, AssetPublishNode } from './work-graph-types';
import { DeploymentState } from './work-graph-types';
import { parallelPromises } from '../../util';
import { IO, type IoHelper } from '../io/private';
import { ToolkitError } from '../toolkit-error';
export type Concurrency = number | Record<WorkNode['type'], number>;
export class WorkGraph {
public readonly nodes: Record<string, WorkNode>;
private readonly readyPool: Array<WorkNode> = [];
private readonly lazyDependencies = new Map<string, string[]>();
private readonly ioHelper: IoHelper;
public error?: Error;
public constructor(nodes: Record<string, WorkNode>, ioHelper: IoHelper) {
this.nodes = { ...nodes };
this.ioHelper = ioHelper;
}
public addNodes(...nodes: WorkNode[]) {
for (const node of nodes) {
if (this.nodes[node.id]) {
throw new ToolkitError(`Duplicate use of node id: ${node.id}`);
}
const ld = this.lazyDependencies.get(node.id);
if (ld) {
for (const x of ld) {
node.dependencies.add(x);
}
this.lazyDependencies.delete(node.id);
}
this.nodes[node.id] = node;
}
}
public removeNode(nodeId: string | WorkNode) {
const id = typeof nodeId === 'string' ? nodeId : nodeId.id;
const removedNode = this.nodes[id];
this.lazyDependencies.delete(id);
delete this.nodes[id];
if (removedNode) {
for (const node of Object.values(this.nodes)) {
node.dependencies.delete(removedNode.id);
}
}
}
/**
* Return all nodes of a given type
*/
public nodesOfType<T extends WorkNode['type']>(type: T): Extract<WorkNode, { type: T }>[] {
return Object.values(this.nodes).filter(n => n.type === type) as any;
}
/**
* Return all nodes that depend on a given node
*/
public dependees(nodeId: string | WorkNode) {
const id = typeof nodeId === 'string' ? nodeId : nodeId.id;
return Object.values(this.nodes).filter(n => n.dependencies.has(id));
}
/**
* Add a dependency, that may come before or after the nodes involved
*/
public addDependency(fromId: string, toId: string) {
const node = this.nodes[fromId];
if (node) {
node.dependencies.add(toId);
return;
}
let lazyDeps = this.lazyDependencies.get(fromId);
if (!lazyDeps) {
lazyDeps = [];
this.lazyDependencies.set(fromId, lazyDeps);
}
lazyDeps.push(toId);
}
public tryGetNode(id: string): WorkNode | undefined {
return this.nodes[id];
}
public node(id: string) {
const ret = this.nodes[id];
if (!ret) {
throw new ToolkitError(`No node with id ${id} among ${Object.keys(this.nodes)}`);
}
return ret;
}
public absorb(graph: WorkGraph) {
this.addNodes(...Object.values(graph.nodes));
}
private hasFailed(): boolean {
return Object.values(this.nodes).some((n) => n.deploymentState === DeploymentState.FAILED);
}
public doParallel(concurrency: Concurrency, actions: WorkGraphActions) {
return this.forAllArtifacts(concurrency, async (x: WorkNode) => {
switch (x.type) {
case 'stack':
await actions.deployStack(x);
break;
case 'asset-build':
await actions.buildAsset(x);
break;
case 'asset-publish':
await actions.publishAsset(x);
break;
}
});
}
/**
* Return the set of unblocked nodes
*/
public async ready(): Promise<ReadonlyArray<WorkNode>> {
await this.updateReadyPool();
return this.readyPool;
}
private forAllArtifacts(n: Concurrency, fn: (x: WorkNode) => Promise<void>): Promise<void> {
const graph = this;
// If 'n' is a number, we limit all concurrency equally (effectively we will be using totalMax)
// If 'n' is a record, we limit each job independently (effectively we will be using max)
const max: Record<WorkNode['type'], number> = typeof n === 'number' ?
{
'asset-build': n,
'asset-publish': n,
'stack': n,
} : n;
const totalMax = typeof n === 'number' ? n : sum(Object.values(n));
return new Promise((ok, fail) => {
let active: Record<WorkNode['type'], number> = {
'asset-build': 0,
'asset-publish': 0,
'stack': 0,
};
function totalActive() {
return sum(Object.values(active));
}
start();
function start() {
graph.updateReadyPool().then(() => {
for (let i = 0; i < graph.readyPool.length; ) {
const node = graph.readyPool[i];
if (active[node.type] < max[node.type] && totalActive() < totalMax) {
graph.readyPool.splice(i, 1);
startOne(node);
} else {
i += 1;
}
}
if (totalActive() === 0) {
if (graph.done()) {
ok();
}
// wait for other active deploys to finish before failing
if (graph.hasFailed()) {
fail(graph.error);
}
}
}).catch((e) => {
fail(e);
});
}
function startOne(x: WorkNode) {
x.deploymentState = DeploymentState.DEPLOYING;
active[x.type]++;
void fn(x)
.finally(() => {
active[x.type]--;
})
.then(() => {
graph.deployed(x);
start();
}).catch((err) => {
// By recording the failure immediately as the queued task exits, we prevent the next
// queued task from starting.
graph.failed(x, err);
start();
});
}
});
}
private done(): boolean {
return Object.values(this.nodes).every((n) => DeploymentState.COMPLETED === n.deploymentState);
}
private deployed(node: WorkNode) {
node.deploymentState = DeploymentState.COMPLETED;
}
private failed(node: WorkNode, error?: Error) {
this.error = error;
node.deploymentState = DeploymentState.FAILED;
this.skipRest();
this.readyPool.splice(0);
}
public toString() {
return [
'digraph D {',
...Object.entries(this.nodes).flatMap(([id, node]) => renderNode(id, node)),
'}',
].join('\n');
function renderNode(id: string, node: WorkNode): string[] {
const ret = [];
if (node.deploymentState === DeploymentState.COMPLETED) {
ret.push(` ${gv(id, { style: 'filled', fillcolor: 'yellow', comment: node.note })};`);
} else {
ret.push(` ${gv(id, { comment: node.note })};`);
}
for (const dep of node.dependencies) {
ret.push(` ${gv(id)} -> ${gv(dep)};`);
}
return ret;
}
}
/**
* Ensure all dependencies actually exist. This protects against scenarios such as the following:
* StackA depends on StackB, but StackB is not selected to deploy. The dependency is redundant
* and will be dropped.
* This assumes the manifest comes uncorrupted so we will not fail if a dependency is not found.
*/
public removeUnavailableDependencies() {
for (const node of Object.values(this.nodes)) {
const removeDeps = Array.from(node.dependencies).filter((dep) => this.nodes[dep] === undefined);
removeDeps.forEach((d) => {
node.dependencies.delete(d);
});
}
}
/**
* Remove all asset publishing steps for assets that are already published, and then build
* that aren't used anymore.
*
* Do this in parallel, because there may be a lot of assets in an application (seen in practice: >100 assets)
*/
public async removeUnnecessaryAssets(isUnnecessary: (x: AssetPublishNode) => Promise<boolean>) {
await this.ioHelper.notify(IO.DEFAULT_TOOLKIT_DEBUG.msg('Checking for previously published assets'));
const publishes = this.nodesOfType('asset-publish');
const classifiedNodes = await parallelPromises(
8,
publishes.map((assetNode) => async() => [assetNode, await isUnnecessary(assetNode)] as const));
const alreadyPublished = classifiedNodes.filter(([_, unnecessary]) => unnecessary).map(([assetNode, _]) => assetNode);
for (const assetNode of alreadyPublished) {
this.removeNode(assetNode);
}
await this.ioHelper.notify(IO.DEFAULT_TOOLKIT_DEBUG.msg(`${publishes.length} total assets, ${publishes.length - alreadyPublished.length} still need to be published`));
// Now also remove any asset build steps that don't have any dependencies on them anymore
const unusedBuilds = this.nodesOfType('asset-build').filter(build => this.dependees(build).length === 0);
for (const unusedBuild of unusedBuilds) {
this.removeNode(unusedBuild);
}
}
private async updateReadyPool() {
const activeCount = Object.values(this.nodes).filter((x) => x.deploymentState === DeploymentState.DEPLOYING).length;
const pendingCount = Object.values(this.nodes).filter((x) => x.deploymentState === DeploymentState.PENDING).length;
const newlyReady = Object.values(this.nodes).filter((x) =>
x.deploymentState === DeploymentState.PENDING &&
Array.from(x.dependencies).every((id) => this.node(id).deploymentState === DeploymentState.COMPLETED));
// Add newly available nodes to the ready pool
for (const node of newlyReady) {
node.deploymentState = DeploymentState.QUEUED;
this.readyPool.push(node);
}
// Remove nodes from the ready pool that have already started deploying
retainOnly(this.readyPool, (node) => node.deploymentState === DeploymentState.QUEUED);
// Sort by reverse priority
this.readyPool.sort((a, b) => (b.priority ?? 0) - (a.priority ?? 0));
if (this.readyPool.length === 0 && activeCount === 0 && pendingCount > 0) {
const cycle = this.findCycle() ?? ['No cycle found!'];
await this.ioHelper.notify(IO.DEFAULT_TOOLKIT_TRACE.msg(`Cycle ${cycle.join(' -> ')} in graph ${this}`));
throw new ToolkitError(`Unable to make progress anymore, dependency cycle between remaining artifacts: ${cycle.join(' -> ')} (run with -vv for full graph)`);
}
}
private skipRest() {
for (const node of Object.values(this.nodes)) {
if ([DeploymentState.QUEUED, DeploymentState.PENDING].includes(node.deploymentState)) {
node.deploymentState = DeploymentState.SKIPPED;
}
}
}
/**
* Find cycles in a graph
*
* Not the fastest, but effective and should be rare
*/
public findCycle(): string[] | undefined {
const seen = new Set<string>();
const self = this;
for (const nodeId of Object.keys(this.nodes)) {
const cycle = recurse(nodeId, [nodeId]);
if (cycle) {
return cycle;
}
}
return undefined;
function recurse(nodeId: string, path: string[]): string[] | undefined {
if (seen.has(nodeId)) {
return undefined;
}
try {
for (const dep of self.nodes[nodeId].dependencies ?? []) {
const index = path.indexOf(dep);
if (index > -1) {
return [...path.slice(index), dep];
}
const cycle = recurse(dep, [...path, dep]);
if (cycle) {
return cycle;
}
}
return undefined;
} finally {
seen.add(nodeId);
}
}
}
/**
* Whether the `end` node is reachable from the `start` node, following the dependency arrows
*/
public reachable(start: string, end: string): boolean {
const seen = new Set<string>();
const self = this;
return recurse(start);
function recurse(current: string) {
if (seen.has(current)) {
return false;
}
seen.add(current);
if (current === end) {
return true;
}
for (const dep of self.nodes[current].dependencies) {
if (recurse(dep)) {
return true;
}
}
return false;
}
}
}
export interface WorkGraphActions {
deployStack: (stackNode: StackNode) => Promise<void>;
buildAsset: (assetNode: AssetBuildNode) => Promise<void>;
publishAsset: (assetNode: AssetPublishNode) => Promise<void>;
}
function sum(xs: number[]) {
let ret = 0;
for (const x of xs) {
ret += x;
}
return ret;
}
function retainOnly<A>(xs: A[], pred: (x: A) => boolean) {
xs.splice(0, xs.length, ...xs.filter(pred));
}
function gv(id: string, attrs?: Record<string, string | undefined>) {
const attrString = Object.entries(attrs ?? {}).flatMap(([k, v]) => v !== undefined ? [`${k}="${v}"`] : []).join(',');
return attrString ? `"${simplifyId(id)}" [${attrString}]` : `"${simplifyId(id)}"`;
}
function simplifyId(id: string) {
return id.replace(/([0-9a-f]{6})[0-9a-f]{6,}/g, '$1');
}