in internal/task/switch_task.go [54:91]
func (t *switchTask) Run() error {
metrics.Inc(constants.MetricsSwitchTask, constants.MetricsTotal)
if len(t.transitions) == 0 {
return nil
}
for _, transition := range t.transitions {
if transition.ToTaskID == constants.TaskEndID {
continue
}
var jqData interface{}
err := json.Unmarshal([]byte(t.input), &jqData)
if err != nil {
return err
}
res, err := t.jq.One(jqData, transition.Condition)
if err != nil {
return err
}
boolValue, err := strconv.ParseBool(gconv.String(res))
if err != nil {
return err
}
if !boolValue {
metrics.Inc(constants.MetricsSwitchTask, constants.MetricsSwitchReject)
continue
}
metrics.Inc(constants.MetricsSwitchTask, constants.MetricsSwitchPass)
var taskInstance = model.WorkflowTaskInstance{WorkflowInstanceID: t.workflowInstanceID,
WorkflowID: t.workflowID, TaskID: transition.ToTaskID, TaskInstanceID: uuid.New().String(),
Status: constants.TaskInstanceWaitStatus, Input: t.baseTask.input}
return t.baseTask.queue.Publish([]*model.WorkflowTaskInstance{&taskInstance})
}
// not match
return t.workflowDAL.UpdateInstance(context.Background(),
&model.WorkflowInstance{WorkflowInstanceID: t.workflowInstanceID,
WorkflowStatus: constants.WorkflowInstanceSuccessStatus})
}