graph/dag.go (150 lines of code) (raw):

// Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. package graph import ( "fmt" "sync" "github.com/pkg/errors" ) const ( rootNodeID = "acb_root" ) // Node represents a vertex in a Dag. type Node struct { Name string Value *Step children map[string]*Node mu sync.Mutex degree int } // NewNode creates a new Node based on the provided name and value. func NewNode(value *Step) *Node { return &Node{ Name: value.ID, Value: value, children: make(map[string]*Node), mu: sync.Mutex{}, degree: 0, } } // GetDegree returns the degree of the node. func (n *Node) GetDegree() int { n.mu.Lock() defer n.mu.Unlock() return n.degree } // Dag represents a thread safe directed acyclic graph. type Dag struct { Root *Node Nodes map[string]*Node mu sync.Mutex } // NewDag creates a new Dag with a root vertex. func NewDag() *Dag { dag := new(Dag) dag.Nodes = make(map[string]*Node) dag.Root = NewNode(&Step{ID: rootNodeID}) return dag } // NewDagFromTask creates a new Dag based on the specified Task. func NewDagFromTask(t *Task) (*Dag, error) { dag := NewDag() var prevStep *Step for _, step := range t.Steps { if err := step.Validate(); err != nil { return dag, err } if _, err := dag.AddVertex(step); err != nil { return dag, err } // If the step is parallel, add it to the root if step.ShouldExecuteImmediately() { if err := dag.AddEdge(rootNodeID, step.ID); err != nil { return dag, err } } else if step.HasNoWhen() { // If the step has no when, add it to the root or the previous step if prevStep == nil { if err := dag.AddEdge(rootNodeID, step.ID); err != nil { return dag, err } } else { if err := dag.AddEdge(prevStep.ID, step.ID); err != nil { return dag, err } } } else { // Otherwise, add edges according to when for _, dep := range step.When { if err := dag.AddEdge(dep, step.ID); err != nil { return dag, err } } } prevStep = step } return dag, nil } // AddVertex adds a vertex to the Dag with the specified name and value. func (d *Dag) AddVertex(value *Step) (*Node, error) { if value.ID == rootNodeID { return nil, fmt.Errorf("%v is a reserved ID, it can't be used", rootNodeID) } d.mu.Lock() defer d.mu.Unlock() if _, ok := d.Nodes[value.ID]; ok { return nil, fmt.Errorf("%s already exists as a vertex", value.ID) } n := NewNode(value) d.Nodes[value.ID] = n return n, nil } // AddEdge adds an edge between from and to. func (d *Dag) AddEdge(from string, to string) error { fromNode, toNode, err := d.validateFromAndTo(from, to) if err != nil { return err } fromNode.mu.Lock() fromNode.children[to] = toNode fromNode.mu.Unlock() toNode.mu.Lock() toNode.degree++ toNode.mu.Unlock() return nil } // RemoveEdge removes the edge between from and to. func (d *Dag) RemoveEdge(from string, to string) error { fromNode, toNode, err := d.validateFromAndTo(from, to) if err != nil { return err } fromNode.mu.Lock() delete(fromNode.children, to) fromNode.mu.Unlock() toNode.mu.Lock() toNode.degree-- toNode.mu.Unlock() return nil } // Children returns the node's children. func (n *Node) Children() []*Node { childNodes := make([]*Node, 0, len(n.children)) n.mu.Lock() defer n.mu.Unlock() for _, v := range n.children { childNodes = append(childNodes, v) } return childNodes } func (d *Dag) validateFromAndTo(from string, to string) (fromNode *Node, toNode *Node, err error) { if from == "" { return nil, nil, errors.New("from cannot be empty") } if to == "" { return nil, nil, errors.New("to cannot be empty") } if from == to { return nil, nil, errors.New("from and to cannot be the same") } d.mu.Lock() defer d.mu.Unlock() if from == rootNodeID { fromNode = d.Root } else { var ok bool if fromNode, ok = d.Nodes[from]; !ok { return nil, nil, fmt.Errorf("%v does not exist as a vertex [from: %v, to: %v]", from, from, to) } } toNode, ok := d.Nodes[to] if !ok { return nil, nil, fmt.Errorf("%v does not exist as a vertex [from: %v, to: %v]", to, from, to) } return fromNode, toNode, nil }