in backend/core/runner/run_task.go [38:141]
func RunTask(
ctx gocontext.Context,
basicRes context.BasicRes,
progress chan plugin.RunningProgress,
taskId uint64,
) (err errors.Error) {
db := basicRes.GetDal()
task := &models.Task{}
if err := db.First(task, dal.Where("id = ?", taskId)); err != nil {
return err
}
if task.Status == models.TASK_COMPLETED {
return errors.Default.New("invalid task status")
}
dbPipeline := &models.Pipeline{}
if err := db.First(dbPipeline, dal.Where("id = ? ", task.PipelineId)); err != nil {
return err
}
logger, err := getTaskLogger(basicRes.GetLogger(), task)
if err != nil {
return err
}
beganAt := time.Now()
// make sure task status always correct even if it panicked
defer func() {
if r := recover(); r != nil {
var e error
switch et := r.(type) {
case error:
e = et
default:
e = fmt.Errorf("%v", et)
}
err = errors.Default.Wrap(e, fmt.Sprintf("run task failed with panic (%s)", utils.GatherCallFrames(0)))
logger.Error(err, "run task failed with panic")
}
finishedAt := time.Now()
spentSeconds := finishedAt.Unix() - beganAt.Unix()
if err != nil {
lakeErr := errors.AsLakeErrorType(err)
subTaskName := "unknown"
if lakeErr = lakeErr.As(errors.SubtaskErr); lakeErr != nil {
if meta, ok := lakeErr.GetData().(*plugin.SubTaskMeta); ok {
subTaskName = meta.Name
}
} else {
lakeErr = errors.Convert(err)
}
dbe := db.UpdateColumns(task, []dal.DalSet{
{ColumnName: "status", Value: models.TASK_FAILED},
{ColumnName: "message", Value: lakeErr.Error()},
{ColumnName: "error_name", Value: lakeErr.Messages().Format()},
{ColumnName: "finished_at", Value: finishedAt},
{ColumnName: "spent_seconds", Value: spentSeconds},
{ColumnName: "failed_sub_task", Value: subTaskName},
})
if dbe != nil {
logger.Error(dbe, "failed to finalize task status into db (task failed)")
}
} else {
dbe := db.UpdateColumns(task, []dal.DalSet{
{ColumnName: "status", Value: models.TASK_COMPLETED},
{ColumnName: "message", Value: ""},
{ColumnName: "finished_at", Value: finishedAt},
{ColumnName: "spent_seconds", Value: spentSeconds},
})
if dbe != nil {
logger.Error(dbe, "failed to finalize task status into db (task succeeded)")
}
}
// update finishedTasks
dbe := db.UpdateColumn(
&models.Pipeline{},
"finished_tasks", dal.Expr("finished_tasks + 1"),
dal.Where("id=?", task.PipelineId),
)
if dbe != nil {
logger.Error(dbe, "update pipeline state failed")
}
// not return err if the `SkipOnFail` is true and the error is not canceled
if dbPipeline.SkipOnFail && !errors.Is(err, gocontext.Canceled) {
err = nil
}
}()
// start execution
logger.Info("start executing task: %d", task.ID)
dbe := db.UpdateColumns(task, []dal.DalSet{
{ColumnName: "status", Value: models.TASK_RUNNING},
{ColumnName: "message", Value: ""},
{ColumnName: "began_at", Value: beganAt},
})
if dbe != nil {
return dbe
}
err = RunPluginTask(
ctx,
basicRes.ReplaceLogger(logger),
task,
progress,
)
return err
}