in backend/server/services/pipeline.go [450:538]
func RerunPipeline(pipelineId uint64, task *models.Task) (tasks []*models.Task, err errors.Error) {
// prevent pipeline executor from doing anything that might jeopardize the integrity
pipeline := &models.Pipeline{}
txHelper := dbhelper.NewTxHelper(basicRes, &err)
tx := txHelper.Begin()
defer txHelper.End()
err = txHelper.LockTablesTimeout(2*time.Second, dal.LockTables{{Table: "_devlake_pipelines", Exclusive: true}})
if err != nil {
err = errors.BadInput.Wrap(err, "failed to lock pipeline table, is there any pending pipeline or deletion?")
return
}
// load the pipeline
err = tx.First(pipeline, dal.Where("id = ?", pipelineId))
if err != nil {
return nil, err
}
// verify the status
if pipeline.Status == models.TASK_RUNNING {
return nil, errors.BadInput.New("pipeline is running")
}
if pipeline.Status == models.TASK_CREATED || pipeline.Status == models.TASK_RERUN {
return nil, errors.BadInput.New("pipeline is waiting to run")
}
// determine which tasks to rerun
var failedTasks []*models.Task
if task != nil {
if task.PipelineId != pipelineId {
return nil, errors.BadInput.New("the task ID and pipeline ID doesn't match")
}
failedTasks = append(failedTasks, task)
} else {
tasks, err := GetTasksWithLastStatus(pipelineId)
if err != nil {
return nil, errors.Default.Wrap(err, "error getting tasks")
}
for _, t := range tasks {
if t.Status != models.TASK_COMPLETED {
failedTasks = append(failedTasks, t)
}
}
}
// no tasks to rerun
if len(failedTasks) == 0 {
return nil, errors.BadInput.New("no tasks to be re-ran")
}
// create new tasks
// TODO: this is better to be wrapped inside a transaction
rerunTasks := []*models.Task{}
for _, t := range failedTasks {
// mark previous task failed
t.Status = models.TASK_FAILED
err := tx.UpdateColumn(t, "status", models.TASK_FAILED)
if err != nil {
return nil, err
}
// create new task
rerunTask, err := CreateTask(&models.NewTask{
PipelineTask: &plugin.PipelineTask{
Plugin: t.Plugin,
Subtasks: t.Subtasks,
Options: t.Options,
},
PipelineId: t.PipelineId,
PipelineRow: t.PipelineRow,
PipelineCol: t.PipelineCol,
IsRerun: true,
})
if err != nil {
return nil, err
}
// append to result
rerunTasks = append(rerunTasks, rerunTask)
}
// mark pipline rerun
err = tx.UpdateColumn(&models.Pipeline{},
"status", models.TASK_RERUN,
dal.Where("id = ?", pipelineId),
)
if err != nil {
return nil, err
}
return rerunTasks, nil
}