in backend/core/runner/run_task.go [176:349]
func RunPluginSubTasks(
ctx gocontext.Context,
basicRes context.BasicRes,
task *models.Task,
pluginTask plugin.PluginTask,
progress chan plugin.RunningProgress,
syncPolicy *models.SyncPolicy,
) errors.Error {
logger := basicRes.GetLogger()
logger.Info("start plugin")
// find out all possible subtasks this plugin can offer
subtaskMetas := pluginTask.SubTaskMetas()
subtasksFlag := make(map[string]bool)
for _, subtaskMeta := range subtaskMetas {
subtasksFlag[subtaskMeta.Name] = subtaskMeta.EnabledByDefault
}
/* subtasksFlag example
subtasksFlag := map[string]bool{
"collectProject": true,
"convertCommits": true,
...
}
*/
// user specifies what subtasks to run
if len(task.Subtasks) != 0 {
// decode user specified subtasks
var specifiedTasks []string
err := api.Decode(task.Subtasks, &specifiedTasks, nil)
if err != nil {
return errors.Default.Wrap(err, "subtasks could not be decoded")
}
if len(specifiedTasks) > 0 {
// first, disable all subtasks
for task := range subtasksFlag {
subtasksFlag[task] = false
}
// second, check specified subtasks is valid and enable them if so
for _, task := range specifiedTasks {
if _, ok := subtasksFlag[task]; ok {
subtasksFlag[task] = true
} else {
return errors.Default.New(fmt.Sprintf("subtask %s does not exist", task))
}
}
}
}
// 1. make sure `Collect` subtasks skip if `SkipCollectors` is true
// 2. make sure `Required` subtasks are always enabled
for _, subtaskMeta := range subtaskMetas {
if syncPolicy != nil && syncPolicy.SkipCollectors && strings.Contains(strings.ToLower(subtaskMeta.Name), "collect") {
subtasksFlag[subtaskMeta.Name] = false
}
if subtaskMeta.Required {
subtasksFlag[subtaskMeta.Name] = true
}
}
// calculate total step(number of task to run)
steps := 0
for _, enabled := range subtasksFlag {
if enabled {
steps++
}
}
taskCtx := contextimpl.NewDefaultTaskContext(ctx, basicRes, task.Plugin, subtasksFlag, progress)
if closeablePlugin, ok := pluginTask.(plugin.CloseablePluginTask); ok {
defer closeablePlugin.Close(taskCtx)
}
options := task.Options
taskData, err := pluginTask.PrepareTaskData(taskCtx, options)
if err != nil {
return errors.Default.Wrap(err, fmt.Sprintf("error preparing task data for %s", task.Plugin))
}
taskCtx.SetSyncPolicy(syncPolicy)
taskCtx.SetData(taskData)
// record subtasks sequence to DB
collectSubtaskNumber := 0
otherSubtaskNumber := 0
isCollector := false
subtask := []models.Subtask{}
for _, subtaskMeta := range subtaskMetas {
subtaskCtx, err := taskCtx.SubTaskContext(subtaskMeta.Name)
if err != nil {
// sth went wrong
return errors.Default.Wrap(err, fmt.Sprintf("error getting context subtask %s", subtaskMeta.Name))
}
if subtaskCtx == nil {
// subtask was disabled
continue
}
if strings.Contains(strings.ToLower(subtaskMeta.Name), "collect") || strings.Contains(strings.ToLower(subtaskMeta.Name), "clone git repo") {
collectSubtaskNumber++
isCollector = true
} else {
otherSubtaskNumber++
isCollector = false
}
s := models.Subtask{
Name: subtaskCtx.GetName(),
TaskID: task.ID,
IsCollector: isCollector,
}
if isCollector {
s.Sequence = collectSubtaskNumber
} else {
s.Sequence = otherSubtaskNumber
}
subtask = append(subtask, s)
}
if err := basicRes.GetDal().CreateOrUpdate(subtask); err != nil {
basicRes.GetLogger().Error(err, "error writing subtask list to DB")
}
// execute subtasks in order
taskCtx.SetProgress(0, steps)
subtaskNumber := 0
for _, subtaskMeta := range subtaskMetas {
subtaskCtx, err := taskCtx.SubTaskContext(subtaskMeta.Name)
if err != nil {
// sth went wrong
return errors.Default.Wrap(err, fmt.Sprintf("error getting context subtask %s", subtaskMeta.Name))
}
subtaskNumber++
if subtaskCtx == nil {
// subtask was disabled
continue
}
// run subtask
if progress != nil {
progress <- plugin.RunningProgress{
Type: plugin.SetCurrentSubTask,
SubTaskName: subtaskMeta.Name,
SubTaskNumber: subtaskNumber,
}
}
subtaskFinished := false
if !subtaskMeta.ForceRunOnResume {
if task.ID > 0 {
sfc := errors.Must1(basicRes.GetDal().Count(
dal.From(&models.Subtask{}), dal.Where("task_id = ? AND name = ? AND finished_at IS NOT NULL", task.ID, subtaskMeta.Name),
),
)
subtaskFinished = sfc > 0
}
}
if subtaskFinished {
logger.Info("subtask %s already finished previously", subtaskMeta.Name)
} else {
logger.Info("executing subtask %s", subtaskMeta.Name)
start := time.Now()
err = runSubtask(basicRes, subtaskCtx, task.ID, subtaskNumber, subtaskMeta.EntryPoint)
logger.Info("subtask %s finished in %d ms", subtaskMeta.Name, time.Since(start).Milliseconds())
if err != nil {
err = errors.SubtaskErr.Wrap(err, fmt.Sprintf("subtask %s ended unexpectedly", subtaskMeta.Name), errors.WithData(&subtaskMeta))
logger.Error(err, "")
where := dal.Where("task_id = ? and name = ?", task.ID, subtaskCtx.GetName())
if err := basicRes.GetDal().UpdateColumns(subtask, []dal.DalSet{
{ColumnName: "is_failed", Value: true},
{ColumnName: "message", Value: err.Error()},
}, where); err != nil {
basicRes.GetLogger().Error(err, "error writing subtask %v status to DB", subtaskCtx.GetName())
}
return err
}
}
taskCtx.IncProgress(1)
}
return nil
}