in dag.go [46:130]
func (d *Dag) runDag(c Config, onReady func(Block) error) error {
var err error
pending := linkedlistqueue.New()
var prePlanBlocks, otherBlocks []Block
for _, v := range d.GetRoots() {
b := v.(Block)
if _, ok := b.(PrePlanBlock); ok {
prePlanBlocks = append(prePlanBlocks, b)
continue
}
otherBlocks = append(otherBlocks, b)
}
for _, b := range prePlanBlocks {
pending.Enqueue(b)
}
for _, b := range otherBlocks {
pending.Enqueue(b)
}
for !pending.Empty() {
next, _ := pending.Dequeue()
b := next.(Block)
// the node has already been expandable and deleted from dag
address := b.Address()
exist := d.exist(address)
if !exist {
continue
}
ancestors, dagErr := d.GetParents(address)
if dagErr != nil {
return dagErr
}
ready := true
for upstreamAddress := range ancestors {
v, dagErr := d.GetVertex(upstreamAddress)
if dagErr != nil {
return dagErr
}
if !v.(Block).isReadyForRead() {
ready = false
break
}
}
if !ready {
continue
}
if b.expandable() {
children, dagErr := d.GetChildren(address)
if dagErr != nil {
return dagErr
}
expandedBlocks, err := c.expandBlock(b)
if err != nil {
return err
}
newPending := linkedlistqueue.New()
for _, eb := range expandedBlocks {
newPending.Enqueue(eb)
}
for _, b := range pending.Values() {
newPending.Enqueue(b)
}
for _, n := range children {
newPending.Enqueue(n)
}
pending = newPending
continue
}
if callbackErr := onReady(b); callbackErr != nil {
err = multierror.Append(err, callbackErr)
}
// this address might be expandable during onReady and no more exist.
exist = d.exist(address)
if !exist {
continue
}
children, dagErr := d.GetChildren(address)
if dagErr != nil {
return dagErr
}
for _, n := range children {
pending.Enqueue(n)
}
}
return err
}