func()

in pkg/common/statemachine/statemachine.go [225:340]


func (sm *statemachine) TransitTo(to State, options ...Option) error {
	// Locking the statemachine to synchronize state changes
	sm.Lock()
	defer sm.Unlock()

	// checking if transition is allowed
	err := sm.isValidTransition(to)
	if err != nil {
		return err
	}

	//  Creating Transition to pass to callbacks
	t := &Transition{
		StateMachine: sm,
		From:         sm.current,
		To:           to,
		Params:       nil,
	}

	// Storing values for reverseTransition
	curState := sm.current

	// Try to stop state recovery if its transitioning
	// from timeout state
	if _, ok := sm.timeoutRules[curState]; ok {
		log.WithFields(log.Fields{
			"task_id":           sm.name,
			"rule_from ":        curState,
			"rule_to":           to,
			"meta_info_noindex": sm.GetMetaInfo(),
			"reason":            sm.reason,
		}).Info("Stopping state timeout recovery")
		sm.timer.Stop()
	}

	// Doing actual transition
	sm.current = to
	sm.lastUpdatedTime = time.Now()
	// Update options
	for _, option := range options {
		option(sm)
	}

	// invoking callback function
	if sm.rules[curState].Callback != nil {
		err = sm.rules[curState].Callback(t)
		if err != nil {
			log.WithError(err).WithFields(log.Fields{
				"task_id":           sm.GetName(),
				"current_state ":    curState,
				"to_state":          to,
				"meta_info_noindex": sm.GetMetaInfo(),
				"reason":            sm.reason,
			}).Error("callback failed for task")
			return err
		}
	}

	// Run the transition callback
	if sm.transitionCallback != nil {
		err = sm.transitionCallback(t)
		if err != nil {
			log.WithError(err).WithFields(log.Fields{
				"task_id":           sm.GetName(),
				"current_state ":    curState,
				"to_state":          to,
				"meta_info_noindex": sm.GetMetaInfo(),
				"reason":            sm.reason,
			}).Error("transition callback failed for task")
			return err
		}
	}
	// Checking if this STATE is timeout state
	if rule, ok := sm.timeoutRules[to]; ok {
		log.WithFields(log.Fields{
			"task_id":           sm.name,
			"rule_from":         curState,
			"rule_to":           to,
			"meta_info_noindex": sm.GetMetaInfo(),
			"reason":            sm.reason,
		}).Info("Task transitioned to timeout state")
		if rule.Timeout != 0 {
			log.WithFields(log.Fields{
				"task_id":   sm.name,
				"rule_from": curState,
				"rule_to":   to,
				"timeout":   rule.Timeout.String(),
				"reason":    sm.reason,
			}).Info("Starting timer to recover state if needed")
			err := sm.timer.Start(rule.Timeout)
			if err != nil {
				log.WithError(err).WithFields(log.Fields{
					"task_id": sm.name,
					"state":   to,
				}).Error("timer could not be started retrying ...")
				sm.timer.Stop()
				err = sm.timer.Start(rule.Timeout)
				if err != nil {
					log.WithError(err).WithFields(log.Fields{
						"task_id": sm.name,
						"state":   to,
					}).Error("timer could not be started, returning")
					return err
				}
			}
		}
	}
	log.WithFields(log.Fields{
		"task_id":           sm.name,
		"from_state ":       curState,
		"to_state":          to,
		"meta_info_noindex": sm.GetMetaInfo(),
		"reason":            sm.reason,
	}).Info("Task transitioned successfully")
	return nil
}