func RerunPipeline()

in backend/server/services/pipeline.go [450:538]


func RerunPipeline(pipelineId uint64, task *models.Task) (tasks []*models.Task, err errors.Error) {
	// prevent pipeline executor from doing anything that might jeopardize the integrity
	pipeline := &models.Pipeline{}
	txHelper := dbhelper.NewTxHelper(basicRes, &err)
	tx := txHelper.Begin()
	defer txHelper.End()
	err = txHelper.LockTablesTimeout(2*time.Second, dal.LockTables{{Table: "_devlake_pipelines", Exclusive: true}})
	if err != nil {
		err = errors.BadInput.Wrap(err, "failed to lock pipeline table, is there any pending pipeline or deletion?")
		return
	}

	// load the pipeline
	err = tx.First(pipeline, dal.Where("id = ?", pipelineId))
	if err != nil {
		return nil, err
	}

	// verify the status
	if pipeline.Status == models.TASK_RUNNING {
		return nil, errors.BadInput.New("pipeline is running")
	}
	if pipeline.Status == models.TASK_CREATED || pipeline.Status == models.TASK_RERUN {
		return nil, errors.BadInput.New("pipeline is waiting to run")
	}

	// determine which tasks to rerun
	var failedTasks []*models.Task
	if task != nil {
		if task.PipelineId != pipelineId {
			return nil, errors.BadInput.New("the task ID and pipeline ID doesn't match")
		}
		failedTasks = append(failedTasks, task)
	} else {
		tasks, err := GetTasksWithLastStatus(pipelineId)
		if err != nil {
			return nil, errors.Default.Wrap(err, "error getting tasks")
		}
		for _, t := range tasks {
			if t.Status != models.TASK_COMPLETED {
				failedTasks = append(failedTasks, t)
			}
		}
	}

	// no tasks to rerun
	if len(failedTasks) == 0 {
		return nil, errors.BadInput.New("no tasks to be re-ran")
	}

	// create new tasks
	// TODO: this is better to be wrapped inside a transaction
	rerunTasks := []*models.Task{}
	for _, t := range failedTasks {
		// mark previous task failed
		t.Status = models.TASK_FAILED
		err := tx.UpdateColumn(t, "status", models.TASK_FAILED)
		if err != nil {
			return nil, err
		}
		// create new task
		rerunTask, err := CreateTask(&models.NewTask{
			PipelineTask: &plugin.PipelineTask{
				Plugin:   t.Plugin,
				Subtasks: t.Subtasks,
				Options:  t.Options,
			},
			PipelineId:  t.PipelineId,
			PipelineRow: t.PipelineRow,
			PipelineCol: t.PipelineCol,
			IsRerun:     true,
		})
		if err != nil {
			return nil, err
		}
		// append to result
		rerunTasks = append(rerunTasks, rerunTask)
	}

	// mark pipline rerun
	err = tx.UpdateColumn(&models.Pipeline{},
		"status", models.TASK_RERUN,
		dal.Where("id = ?", pipelineId),
	)
	if err != nil {
		return nil, err
	}
	return rerunTasks, nil
}