func()

in workflow.go [365:430]


func (w *Workflow) tick(ctx context.Context) bool {
	if w.IsTerminated() {
		return true
	}
	for step := range w.steps {
		state := w.StateOf(step)
		// we only process pending Steps
		if state.GetStatus() != Pending {
			continue
		}
		// we only process Steps whose all upstreams are terminated
		ups := w.UpstreamOf(step)
		if isAnyUpstreamNotTerminated(ups) {
			continue
		}
		option := state.Option()
		cond := DefaultCondition
		if option != nil && option.Condition != nil {
			cond = option.Condition
		}
		// if condition is evaluated to terminate
		if nextStatus := cond(ctx, ups); nextStatus.IsTerminated() {
			state.SetStatus(nextStatus)
			w.waitGroup.Add(1)
			go func() {
				defer w.waitGroup.Done()
				w.signalStatusChange() // it locks w.statusChange.L, so we need another goroutine
			}()
			continue
		}
		// kick off the Step
		if w.lease() {
			state.SetStatus(Running)
			w.waitGroup.Add(1)
			go func(ctx context.Context, step Steper, state *State) {
				defer w.waitGroup.Done()
				defer w.unlease()

				var err error
				status := Failed
				defer func() {
					state.SetStatus(status)
					state.SetError(err)
					w.signalStatusChange()
				}()

				err = w.runStep(ctx, step, state)
				if err == nil {
					status = Succeeded
					return
				}
				status = StatusFromError(err)
				if status == Failed { // do some extra checks
					switch {
					case
						DefaultIsCanceled(err),
						errors.Is(err, context.Canceled),
						errors.Is(err, context.DeadlineExceeded):
						status = Canceled
					}
				}
			}(ctx, step, state)
		}
	}
	return false
}