in workflow.go [692:761]
func (w *Workflow) traverseDAG(f func(*Step) DError) DError {
// waiting = steps and the dependencies they are waiting for.
// running = the currently running steps.
// start = map of steps' start channels/semaphores.
// done = map of steps' done channels for signaling step completion.
waiting := map[string][]string{}
var running []string
start := map[string]chan DError{}
done := map[string]chan DError{}
// Setup: channels, copy dependencies.
for name := range w.Steps {
waiting[name] = w.Dependencies[name]
start[name] = make(chan DError)
done[name] = make(chan DError)
}
// Setup: goroutine for each step. Each waits to be notified to start.
for name, s := range w.Steps {
go func(name string, s *Step) {
// Wait for signal, then run the function. Return any errs.
if err := <-start[name]; err != nil {
done[name] <- err
} else if err := f(s); err != nil {
done[name] <- err
}
close(done[name])
}(name, s)
}
// Main signaling logic.
for len(waiting) != 0 || len(running) != 0 {
// If we got a Cancel signal, kill all waiting steps.
// Let running steps finish.
select {
case <-w.Cancel:
waiting = map[string][]string{}
default:
}
// Kick off all steps that aren't waiting for anything.
for name, deps := range waiting {
if len(deps) == 0 {
delete(waiting, name)
running = append(running, name)
close(start[name])
}
}
// Sanity check. There should be at least one running step,
// but loop back through if there isn't.
if len(running) == 0 {
continue
}
// Get next finished step. Return the step error if it erred.
finished, err := stepsListen(running, done)
if err != nil {
return err
}
// Remove finished step from other steps' waiting lists.
for name, deps := range waiting {
waiting[name] = filter(deps, finished)
}
// Remove finished from currently running list.
running = filter(running, finished)
}
return nil
}