condition.go (106 lines of code) (raw):

package flow import ( "context" "errors" "fmt" ) // StepStatus describes the status of a Step. type StepStatus string const ( Pending StepStatus = "" Running StepStatus = "Running" Failed StepStatus = "Failed" Succeeded StepStatus = "Succeeded" Canceled StepStatus = "Canceled" Skipped StepStatus = "Skipped" ) func (s StepStatus) IsTerminated() bool { switch s { case Failed, Succeeded, Canceled, Skipped: return true } return false } func (s StepStatus) String() string { switch s { case Pending: return "Pending" case Running, Failed, Succeeded, Canceled, Skipped: return string(s) default: return fmt.Sprintf("Unknown(%s)", string(s)) } } // Condition is a function to determine what's the next status of Step. // Condition makes the decision based on the status and result of all the Upstream Steps. // Condition is only called when all Upstream Steps are terminated. type Condition func(ctx context.Context, ups map[Steper]StepResult) StepStatus var ( // DefaultCondition used in workflow, defaults to AllSucceeded DefaultCondition Condition = AllSucceeded // DefaultIsCanceled is used to determine whether an error is being regarded as canceled. DefaultIsCanceled = func(err error) bool { switch { case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded), StatusFromError(err) == Canceled: return true } return false } ) // Always: as long as all Upstreams are terminated func Always(context.Context, map[Steper]StepResult) StepStatus { return Running } // AllSucceeded: all Upstreams are Succeeded func AllSucceeded(ctx context.Context, ups map[Steper]StepResult) StepStatus { if DefaultIsCanceled(ctx.Err()) { return Canceled } for _, up := range ups { if up.Status != Succeeded { return Skipped } } return Running } // AnySucceeded: any Upstream is Succeeded func AnySucceeded(ctx context.Context, ups map[Steper]StepResult) StepStatus { if DefaultIsCanceled(ctx.Err()) { return Canceled } for _, up := range ups { if up.Status == Succeeded { return Running } } return Skipped } // AllSucceededOrSkipped: all Upstreams are Succeeded or Skipped func AllSucceededOrSkipped(ctx context.Context, ups map[Steper]StepResult) StepStatus { if DefaultIsCanceled(ctx.Err()) { return Canceled } for _, up := range ups { if up.Status != Succeeded && up.Status != Skipped { return Skipped } } return Running } // BeCanceled: only run when the workflow is canceled func BeCanceled(ctx context.Context, ups map[Steper]StepResult) StepStatus { if DefaultIsCanceled(ctx.Err()) { return Running } return Skipped } // AnyFailed: any Upstream is Failed func AnyFailed(ctx context.Context, ups map[Steper]StepResult) StepStatus { if DefaultIsCanceled(ctx.Err()) { return Canceled } for _, up := range ups { if up.Status == Failed { return Running } } return Skipped } // ConditionOrDefault will use DefaultCondition if cond is nil. func ConditionOrDefault(cond Condition) func(context.Context, map[Steper]StepResult) StepStatus { return func(ctx context.Context, ups map[Steper]StepResult) StepStatus { if cond == nil { return DefaultCondition(ctx, ups) } return cond(ctx, ups) } }