in internal/schedule/inline_scheduler.go [68:103]
func (s *inlineScheduler) handle() {
if err := dal.GetDalClient().Transaction(func(tx *gorm.DB) error {
var res *model.WorkflowTaskInstance
if err := s.lock(func() error {
var err error
res, err = s.fetchTask(tx)
if err != nil {
return err
}
return nil
}); err != nil {
return err
}
if res == nil {
return nil
}
// filter input data
filter.FilterWorkflowTaskInputData(res)
t := task.New(res)
if t == nil {
return nil
}
if err := retry.Do(func() error {
return t.Run()
}, retry.Attempts(constants.RetryAttempts)); err != nil {
return s.workflowDAL.UpdateTaskInstance(tx, &model.WorkflowTaskInstance{ID: res.ID,
Status: constants.TaskInstanceFailStatus})
}
return s.workflowDAL.UpdateTaskInstance(tx, &model.WorkflowTaskInstance{ID: res.ID,
Status: constants.TaskInstanceSuccessStatus})
}); err != nil {
log.Get(constants.LogSchedule).Errorf("handle UpdateTaskInstance error=%v", err)
}
}