func RunPluginSubTasks()

in backend/core/runner/run_task.go [176:349]


func RunPluginSubTasks(
	ctx gocontext.Context,
	basicRes context.BasicRes,
	task *models.Task,
	pluginTask plugin.PluginTask,
	progress chan plugin.RunningProgress,
	syncPolicy *models.SyncPolicy,
) errors.Error {
	logger := basicRes.GetLogger()
	logger.Info("start plugin")
	// find out all possible subtasks this plugin can offer
	subtaskMetas := pluginTask.SubTaskMetas()
	subtasksFlag := make(map[string]bool)
	for _, subtaskMeta := range subtaskMetas {
		subtasksFlag[subtaskMeta.Name] = subtaskMeta.EnabledByDefault
	}
	/* subtasksFlag example
	subtasksFlag := map[string]bool{
		"collectProject": true,
		"convertCommits": true,
		...
	}
	*/

	// user specifies what subtasks to run
	if len(task.Subtasks) != 0 {
		// decode user specified subtasks
		var specifiedTasks []string
		err := api.Decode(task.Subtasks, &specifiedTasks, nil)
		if err != nil {
			return errors.Default.Wrap(err, "subtasks could not be decoded")
		}
		if len(specifiedTasks) > 0 {
			// first, disable all subtasks
			for task := range subtasksFlag {
				subtasksFlag[task] = false
			}
			// second, check specified subtasks is valid and enable them if so
			for _, task := range specifiedTasks {
				if _, ok := subtasksFlag[task]; ok {
					subtasksFlag[task] = true
				} else {
					return errors.Default.New(fmt.Sprintf("subtask %s does not exist", task))
				}
			}
		}
	}

	// 1. make sure `Collect` subtasks skip if `SkipCollectors` is true
	// 2. make sure `Required` subtasks are always enabled
	for _, subtaskMeta := range subtaskMetas {
		if syncPolicy != nil && syncPolicy.SkipCollectors && strings.Contains(strings.ToLower(subtaskMeta.Name), "collect") {
			subtasksFlag[subtaskMeta.Name] = false
		}
		if subtaskMeta.Required {
			subtasksFlag[subtaskMeta.Name] = true
		}
	}

	// calculate total step(number of task to run)
	steps := 0
	for _, enabled := range subtasksFlag {
		if enabled {
			steps++
		}
	}

	taskCtx := contextimpl.NewDefaultTaskContext(ctx, basicRes, task.Plugin, subtasksFlag, progress)
	if closeablePlugin, ok := pluginTask.(plugin.CloseablePluginTask); ok {
		defer closeablePlugin.Close(taskCtx)
	}
	options := task.Options
	taskData, err := pluginTask.PrepareTaskData(taskCtx, options)
	if err != nil {
		return errors.Default.Wrap(err, fmt.Sprintf("error preparing task data for %s", task.Plugin))
	}
	taskCtx.SetSyncPolicy(syncPolicy)
	taskCtx.SetData(taskData)

	// record subtasks sequence to DB
	collectSubtaskNumber := 0
	otherSubtaskNumber := 0
	isCollector := false
	subtask := []models.Subtask{}
	for _, subtaskMeta := range subtaskMetas {
		subtaskCtx, err := taskCtx.SubTaskContext(subtaskMeta.Name)
		if err != nil {
			// sth went wrong
			return errors.Default.Wrap(err, fmt.Sprintf("error getting context subtask %s", subtaskMeta.Name))
		}
		if subtaskCtx == nil {
			// subtask was disabled
			continue
		}
		if strings.Contains(strings.ToLower(subtaskMeta.Name), "collect") || strings.Contains(strings.ToLower(subtaskMeta.Name), "clone git repo") {
			collectSubtaskNumber++
			isCollector = true
		} else {
			otherSubtaskNumber++
			isCollector = false
		}
		s := models.Subtask{
			Name:        subtaskCtx.GetName(),
			TaskID:      task.ID,
			IsCollector: isCollector,
		}
		if isCollector {
			s.Sequence = collectSubtaskNumber
		} else {
			s.Sequence = otherSubtaskNumber
		}
		subtask = append(subtask, s)
	}
	if err := basicRes.GetDal().CreateOrUpdate(subtask); err != nil {
		basicRes.GetLogger().Error(err, "error writing subtask list to DB")
	}

	// execute subtasks in order
	taskCtx.SetProgress(0, steps)
	subtaskNumber := 0
	for _, subtaskMeta := range subtaskMetas {
		subtaskCtx, err := taskCtx.SubTaskContext(subtaskMeta.Name)
		if err != nil {
			// sth went wrong
			return errors.Default.Wrap(err, fmt.Sprintf("error getting context subtask %s", subtaskMeta.Name))
		}
		subtaskNumber++
		if subtaskCtx == nil {
			// subtask was disabled
			continue
		}
		// run subtask
		if progress != nil {
			progress <- plugin.RunningProgress{
				Type:          plugin.SetCurrentSubTask,
				SubTaskName:   subtaskMeta.Name,
				SubTaskNumber: subtaskNumber,
			}
		}
		subtaskFinished := false
		if !subtaskMeta.ForceRunOnResume {
			if task.ID > 0 {
				sfc := errors.Must1(basicRes.GetDal().Count(
					dal.From(&models.Subtask{}), dal.Where("task_id = ? AND name = ? AND finished_at IS NOT NULL", task.ID, subtaskMeta.Name),
				),
				)
				subtaskFinished = sfc > 0
			}
		}
		if subtaskFinished {
			logger.Info("subtask %s already finished previously", subtaskMeta.Name)
		} else {
			logger.Info("executing subtask %s", subtaskMeta.Name)
			start := time.Now()
			err = runSubtask(basicRes, subtaskCtx, task.ID, subtaskNumber, subtaskMeta.EntryPoint)
			logger.Info("subtask %s finished in %d ms", subtaskMeta.Name, time.Since(start).Milliseconds())
			if err != nil {
				err = errors.SubtaskErr.Wrap(err, fmt.Sprintf("subtask %s ended unexpectedly", subtaskMeta.Name), errors.WithData(&subtaskMeta))
				logger.Error(err, "")
				where := dal.Where("task_id = ? and name = ?", task.ID, subtaskCtx.GetName())
				if err := basicRes.GetDal().UpdateColumns(subtask, []dal.DalSet{
					{ColumnName: "is_failed", Value: true},
					{ColumnName: "message", Value: err.Error()},
				}, where); err != nil {
					basicRes.GetLogger().Error(err, "error writing subtask %v status to DB", subtaskCtx.GetName())
				}
				return err
			}
		}
		taskCtx.IncProgress(1)
	}

	return nil
}