in workflow.go [365:430]
func (w *Workflow) tick(ctx context.Context) bool {
if w.IsTerminated() {
return true
}
for step := range w.steps {
state := w.StateOf(step)
// we only process pending Steps
if state.GetStatus() != Pending {
continue
}
// we only process Steps whose all upstreams are terminated
ups := w.UpstreamOf(step)
if isAnyUpstreamNotTerminated(ups) {
continue
}
option := state.Option()
cond := DefaultCondition
if option != nil && option.Condition != nil {
cond = option.Condition
}
// if condition is evaluated to terminate
if nextStatus := cond(ctx, ups); nextStatus.IsTerminated() {
state.SetStatus(nextStatus)
w.waitGroup.Add(1)
go func() {
defer w.waitGroup.Done()
w.signalStatusChange() // it locks w.statusChange.L, so we need another goroutine
}()
continue
}
// kick off the Step
if w.lease() {
state.SetStatus(Running)
w.waitGroup.Add(1)
go func(ctx context.Context, step Steper, state *State) {
defer w.waitGroup.Done()
defer w.unlease()
var err error
status := Failed
defer func() {
state.SetStatus(status)
state.SetError(err)
w.signalStatusChange()
}()
err = w.runStep(ctx, step, state)
if err == nil {
status = Succeeded
return
}
status = StatusFromError(err)
if status == Failed { // do some extra checks
switch {
case
DefaultIsCanceled(err),
errors.Is(err, context.Canceled),
errors.Is(err, context.DeadlineExceeded):
status = Canceled
}
}
}(ctx, step, state)
}
}
return false
}