in pkg/common/statemachine/statemachine.go [225:340]
func (sm *statemachine) TransitTo(to State, options ...Option) error {
// Locking the statemachine to synchronize state changes
sm.Lock()
defer sm.Unlock()
// checking if transition is allowed
err := sm.isValidTransition(to)
if err != nil {
return err
}
// Creating Transition to pass to callbacks
t := &Transition{
StateMachine: sm,
From: sm.current,
To: to,
Params: nil,
}
// Storing values for reverseTransition
curState := sm.current
// Try to stop state recovery if its transitioning
// from timeout state
if _, ok := sm.timeoutRules[curState]; ok {
log.WithFields(log.Fields{
"task_id": sm.name,
"rule_from ": curState,
"rule_to": to,
"meta_info_noindex": sm.GetMetaInfo(),
"reason": sm.reason,
}).Info("Stopping state timeout recovery")
sm.timer.Stop()
}
// Doing actual transition
sm.current = to
sm.lastUpdatedTime = time.Now()
// Update options
for _, option := range options {
option(sm)
}
// invoking callback function
if sm.rules[curState].Callback != nil {
err = sm.rules[curState].Callback(t)
if err != nil {
log.WithError(err).WithFields(log.Fields{
"task_id": sm.GetName(),
"current_state ": curState,
"to_state": to,
"meta_info_noindex": sm.GetMetaInfo(),
"reason": sm.reason,
}).Error("callback failed for task")
return err
}
}
// Run the transition callback
if sm.transitionCallback != nil {
err = sm.transitionCallback(t)
if err != nil {
log.WithError(err).WithFields(log.Fields{
"task_id": sm.GetName(),
"current_state ": curState,
"to_state": to,
"meta_info_noindex": sm.GetMetaInfo(),
"reason": sm.reason,
}).Error("transition callback failed for task")
return err
}
}
// Checking if this STATE is timeout state
if rule, ok := sm.timeoutRules[to]; ok {
log.WithFields(log.Fields{
"task_id": sm.name,
"rule_from": curState,
"rule_to": to,
"meta_info_noindex": sm.GetMetaInfo(),
"reason": sm.reason,
}).Info("Task transitioned to timeout state")
if rule.Timeout != 0 {
log.WithFields(log.Fields{
"task_id": sm.name,
"rule_from": curState,
"rule_to": to,
"timeout": rule.Timeout.String(),
"reason": sm.reason,
}).Info("Starting timer to recover state if needed")
err := sm.timer.Start(rule.Timeout)
if err != nil {
log.WithError(err).WithFields(log.Fields{
"task_id": sm.name,
"state": to,
}).Error("timer could not be started retrying ...")
sm.timer.Stop()
err = sm.timer.Start(rule.Timeout)
if err != nil {
log.WithError(err).WithFields(log.Fields{
"task_id": sm.name,
"state": to,
}).Error("timer could not be started, returning")
return err
}
}
}
}
log.WithFields(log.Fields{
"task_id": sm.name,
"from_state ": curState,
"to_state": to,
"meta_info_noindex": sm.GetMetaInfo(),
"reason": sm.reason,
}).Info("Task transitioned successfully")
return nil
}