state.go (89 lines of code) (raw):
package flow
import (
"context"
"sync"
)
// State is the internal state of a Step in a Workflow.
//
// It has the status and the config (dependency, input, retry option, condition, timeout, etc.) of the step.
// The status could be read / write from different goroutines, so use RWMutex to protect it.
type State struct {
StepResult
Config *StepConfig
sync.RWMutex
}
func (s *State) GetStatus() StepStatus {
s.RLock()
defer s.RUnlock()
return s.Status
}
func (s *State) SetStatus(ss StepStatus) {
s.Lock()
defer s.Unlock()
s.Status = ss
}
func (s *State) GetError() error {
s.RLock()
defer s.RUnlock()
return s.Err
}
func (s *State) SetError(err error) {
s.Lock()
defer s.Unlock()
s.Err = err
}
func (s *State) GetStepResult() StepResult {
s.RLock()
defer s.RUnlock()
return s.StepResult
}
func (s *State) Upstreams() Set[Steper] {
if s.Config == nil {
return nil
}
return s.Config.Upstreams
}
func (s *State) Option() *StepOption {
opt := &StepOption{}
if s.Config != nil && s.Config.Option != nil {
for _, o := range s.Config.Option {
o(opt)
}
}
return opt
}
func (s *State) Before(ctx context.Context, step Steper) (context.Context, error) {
if s.Config == nil || len(s.Config.Before) == 0 {
return ctx, nil
}
for _, b := range s.Config.Before {
var err error
ctx, err = b(ctx, step)
if err != nil {
return ctx, err
}
}
return ctx, nil
}
func (s *State) After(ctx context.Context, step Steper, err error) error {
if s.Config == nil || len(s.Config.After) == 0 {
return err
}
for _, a := range s.Config.After {
err = a(ctx, step, err)
}
return err
}
func (s *State) AddUpstream(up Steper) {
if s.Config == nil {
s.Config = &StepConfig{}
}
if s.Config.Upstreams == nil {
s.Config.Upstreams = make(Set[Steper])
}
if up != nil {
s.Config.Upstreams.Add(up)
}
}
func (s *State) MergeConfig(sc *StepConfig) {
if s.Config == nil {
s.Config = &StepConfig{}
}
s.Config.Merge(sc)
}