func RunTask()

in backend/core/runner/run_task.go [38:141]


func RunTask(
	ctx gocontext.Context,
	basicRes context.BasicRes,
	progress chan plugin.RunningProgress,
	taskId uint64,
) (err errors.Error) {
	db := basicRes.GetDal()
	task := &models.Task{}
	if err := db.First(task, dal.Where("id = ?", taskId)); err != nil {
		return err
	}
	if task.Status == models.TASK_COMPLETED {
		return errors.Default.New("invalid task status")
	}
	dbPipeline := &models.Pipeline{}
	if err := db.First(dbPipeline, dal.Where("id = ? ", task.PipelineId)); err != nil {
		return err
	}
	logger, err := getTaskLogger(basicRes.GetLogger(), task)
	if err != nil {
		return err
	}
	beganAt := time.Now()
	// make sure task status always correct even if it panicked
	defer func() {
		if r := recover(); r != nil {
			var e error
			switch et := r.(type) {
			case error:
				e = et
			default:
				e = fmt.Errorf("%v", et)
			}
			err = errors.Default.Wrap(e, fmt.Sprintf("run task failed with panic (%s)", utils.GatherCallFrames(0)))
			logger.Error(err, "run task failed with panic")
		}
		finishedAt := time.Now()
		spentSeconds := finishedAt.Unix() - beganAt.Unix()
		if err != nil {
			lakeErr := errors.AsLakeErrorType(err)
			subTaskName := "unknown"
			if lakeErr = lakeErr.As(errors.SubtaskErr); lakeErr != nil {
				if meta, ok := lakeErr.GetData().(*plugin.SubTaskMeta); ok {
					subTaskName = meta.Name
				}
			} else {
				lakeErr = errors.Convert(err)
			}
			dbe := db.UpdateColumns(task, []dal.DalSet{
				{ColumnName: "status", Value: models.TASK_FAILED},
				{ColumnName: "message", Value: lakeErr.Error()},
				{ColumnName: "error_name", Value: lakeErr.Messages().Format()},
				{ColumnName: "finished_at", Value: finishedAt},
				{ColumnName: "spent_seconds", Value: spentSeconds},
				{ColumnName: "failed_sub_task", Value: subTaskName},
			})
			if dbe != nil {
				logger.Error(dbe, "failed to finalize task status into db (task failed)")
			}
		} else {
			dbe := db.UpdateColumns(task, []dal.DalSet{
				{ColumnName: "status", Value: models.TASK_COMPLETED},
				{ColumnName: "message", Value: ""},
				{ColumnName: "finished_at", Value: finishedAt},
				{ColumnName: "spent_seconds", Value: spentSeconds},
			})
			if dbe != nil {
				logger.Error(dbe, "failed to finalize task status into db (task succeeded)")
			}
		}
		// update finishedTasks
		dbe := db.UpdateColumn(
			&models.Pipeline{},
			"finished_tasks", dal.Expr("finished_tasks + 1"),
			dal.Where("id=?", task.PipelineId),
		)
		if dbe != nil {
			logger.Error(dbe, "update pipeline state failed")
		}
		// not return err if the `SkipOnFail` is true and the error is not canceled
		if dbPipeline.SkipOnFail && !errors.Is(err, gocontext.Canceled) {
			err = nil
		}
	}()

	// start execution
	logger.Info("start executing task: %d", task.ID)
	dbe := db.UpdateColumns(task, []dal.DalSet{
		{ColumnName: "status", Value: models.TASK_RUNNING},
		{ColumnName: "message", Value: ""},
		{ColumnName: "began_at", Value: beganAt},
	})
	if dbe != nil {
		return dbe
	}

	err = RunPluginTask(
		ctx,
		basicRes.ReplaceLogger(logger),
		task,
		progress,
	)
	return err
}