dag.go (172 lines of code) (raw):
package golden
import (
"github.com/emirpasic/gods/queues/linkedlistqueue"
"github.com/emirpasic/gods/sets/hashset"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/hcl/v2/hclsyntax"
"github.com/heimdalr/dag"
)
type Dag struct {
*dag.DAG
}
func newDag() *Dag {
return &Dag{
DAG: dag.NewDAG(),
}
}
func (d *Dag) buildDag(blocks []Block) error {
var walkErr error
for _, b := range blocks {
err := d.AddVertexByID(b.Address(), b)
if err != nil {
walkErr = multierror.Append(walkErr, err)
}
}
for _, b := range blocks {
diag := hclsyntax.Walk(b.HclBlock().Body, newDagWalker(d, b.Address()))
if diag.HasErrors() {
walkErr = multierror.Append(walkErr, diag.Errs()...)
}
}
return walkErr
}
func (d *Dag) addEdge(from, to string) error {
err := d.AddEdge(from, to)
if err != nil {
return err
}
return nil
}
func (d *Dag) runDag(c Config, onReady func(Block) error) error {
var err error
pending := linkedlistqueue.New()
var prePlanBlocks, otherBlocks []Block
for _, v := range d.GetRoots() {
b := v.(Block)
if _, ok := b.(PrePlanBlock); ok {
prePlanBlocks = append(prePlanBlocks, b)
continue
}
otherBlocks = append(otherBlocks, b)
}
for _, b := range prePlanBlocks {
pending.Enqueue(b)
}
for _, b := range otherBlocks {
pending.Enqueue(b)
}
for !pending.Empty() {
next, _ := pending.Dequeue()
b := next.(Block)
// the node has already been expandable and deleted from dag
address := b.Address()
exist := d.exist(address)
if !exist {
continue
}
ancestors, dagErr := d.GetParents(address)
if dagErr != nil {
return dagErr
}
ready := true
for upstreamAddress := range ancestors {
v, dagErr := d.GetVertex(upstreamAddress)
if dagErr != nil {
return dagErr
}
if !v.(Block).isReadyForRead() {
ready = false
break
}
}
if !ready {
continue
}
if b.expandable() {
children, dagErr := d.GetChildren(address)
if dagErr != nil {
return dagErr
}
expandedBlocks, err := c.expandBlock(b)
if err != nil {
return err
}
newPending := linkedlistqueue.New()
for _, eb := range expandedBlocks {
newPending.Enqueue(eb)
}
for _, b := range pending.Values() {
newPending.Enqueue(b)
}
for _, n := range children {
newPending.Enqueue(n)
}
pending = newPending
continue
}
if callbackErr := onReady(b); callbackErr != nil {
err = multierror.Append(err, callbackErr)
}
// this address might be expandable during onReady and no more exist.
exist = d.exist(address)
if !exist {
continue
}
children, dagErr := d.GetChildren(address)
if dagErr != nil {
return dagErr
}
for _, n := range children {
pending.Enqueue(n)
}
}
return err
}
func traverse[T Block](d *Dag, f func(b T) error) error {
var err error
pending := linkedlistqueue.New()
visited := hashset.New()
for _, i := range d.GetRoots() {
pending.Enqueue(i)
}
for !pending.Empty() {
next, _ := pending.Dequeue()
if visited.Contains(next) {
continue
}
nb := next.(Block)
address := nb.Address()
parents, parentErr := d.GetParents(address)
if parentErr != nil {
return parentErr
}
ready := true
for _, p := range parents {
if !visited.Contains(p) {
ready = false
break
}
}
if !ready {
pending.Enqueue(next)
continue
}
visited.Add(next)
if b, ok := nb.(T); ok {
if subError := f(b); subError != nil {
err = multierror.Append(err, subError)
}
}
children, getChildrenErr := d.GetChildren(address)
if getChildrenErr != nil {
return getChildrenErr
}
for _, c := range children {
pending.Enqueue(c)
}
}
return err
}
func (d *Dag) exist(address string) bool {
n, existErr := d.GetVertex(address)
notExist := n == nil || existErr != nil
return !notExist
}