func()

in dag.go [46:130]


func (d *Dag) runDag(c Config, onReady func(Block) error) error {
	var err error
	pending := linkedlistqueue.New()
	var prePlanBlocks, otherBlocks []Block
	for _, v := range d.GetRoots() {
		b := v.(Block)
		if _, ok := b.(PrePlanBlock); ok {
			prePlanBlocks = append(prePlanBlocks, b)
			continue
		}
		otherBlocks = append(otherBlocks, b)
	}
	for _, b := range prePlanBlocks {
		pending.Enqueue(b)
	}
	for _, b := range otherBlocks {
		pending.Enqueue(b)
	}
	for !pending.Empty() {
		next, _ := pending.Dequeue()
		b := next.(Block)
		// the node has already been expandable and deleted from dag
		address := b.Address()
		exist := d.exist(address)
		if !exist {
			continue
		}
		ancestors, dagErr := d.GetParents(address)
		if dagErr != nil {
			return dagErr
		}
		ready := true
		for upstreamAddress := range ancestors {
			v, dagErr := d.GetVertex(upstreamAddress)
			if dagErr != nil {
				return dagErr
			}
			if !v.(Block).isReadyForRead() {
				ready = false
				break
			}
		}
		if !ready {
			continue
		}
		if b.expandable() {
			children, dagErr := d.GetChildren(address)
			if dagErr != nil {
				return dagErr
			}
			expandedBlocks, err := c.expandBlock(b)
			if err != nil {
				return err
			}
			newPending := linkedlistqueue.New()
			for _, eb := range expandedBlocks {
				newPending.Enqueue(eb)
			}
			for _, b := range pending.Values() {
				newPending.Enqueue(b)
			}
			for _, n := range children {
				newPending.Enqueue(n)
			}
			pending = newPending
			continue
		}
		if callbackErr := onReady(b); callbackErr != nil {
			err = multierror.Append(err, callbackErr)
		}
		// this address might be expandable during onReady and no more exist.
		exist = d.exist(address)
		if !exist {
			continue
		}
		children, dagErr := d.GetChildren(address)
		if dagErr != nil {
			return dagErr
		}
		for _, n := range children {
			pending.Enqueue(n)
		}
	}
	return err
}