in internal/task/operation_task.go [49:73]
func (t *operationTask) Run() error {
metrics.Inc(constants.MetricsOperationTask, constants.MetricsTotal)
if t.action == nil {
return nil
}
// match end
if t.transition.ToTaskID == constants.TaskEndID {
if t.action != nil {
if err := publishEvent(t.workflowInstanceID, uuid.New().String(), t.action.OperationName, t.input); err != nil {
return err
}
}
return t.workflowDAL.UpdateInstance(context.Background(),
&model.WorkflowInstance{WorkflowInstanceID: t.workflowInstanceID,
WorkflowStatus: constants.WorkflowInstanceSuccessStatus})
}
var taskInstanceID = uuid.New().String()
var taskInstance = model.WorkflowTaskInstance{WorkflowInstanceID: t.workflowInstanceID, WorkflowID: t.workflowID,
TaskID: t.transition.ToTaskID, TaskInstanceID: taskInstanceID, Status: constants.TaskInstanceSleepStatus,
Input: t.baseTask.input}
if err := t.baseTask.queue.Publish([]*model.WorkflowTaskInstance{&taskInstance}); err != nil {
return err
}
return publishEvent(t.workflowInstanceID, taskInstanceID, t.action.OperationName, t.input)
}