backend/core/runner/run_task.go (376 lines of code) (raw):
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package runner
import (
gocontext "context"
"fmt"
"strings"
"time"
"github.com/apache/incubator-devlake/core/models/common"
"github.com/apache/incubator-devlake/core/context"
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/log"
"github.com/apache/incubator-devlake/core/models"
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/core/utils"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
contextimpl "github.com/apache/incubator-devlake/impls/context"
"github.com/apache/incubator-devlake/impls/logruslog"
)
// RunTask FIXME ...
func RunTask(
ctx gocontext.Context,
basicRes context.BasicRes,
progress chan plugin.RunningProgress,
taskId uint64,
) (err errors.Error) {
db := basicRes.GetDal()
task := &models.Task{}
if err := db.First(task, dal.Where("id = ?", taskId)); err != nil {
return err
}
dbPipeline := &models.Pipeline{}
if err := db.First(dbPipeline, dal.Where("id = ? ", task.PipelineId)); err != nil {
return err
}
logger, err := getTaskLogger(basicRes.GetLogger(), task)
if err != nil {
return err
}
beganAt := time.Now()
if task.BeganAt != nil {
beganAt = *task.BeganAt
}
// make sure task status always correct even if it panicked
defer func() {
if r := recover(); r != nil {
var e error
switch et := r.(type) {
case error:
e = et
default:
e = fmt.Errorf("%v", et)
}
err = errors.Default.Wrap(e, fmt.Sprintf("run task failed with panic (%s)", utils.GatherCallFrames(0)))
logger.Error(err, "run task failed with panic")
}
finishedAt := time.Now()
spentSeconds := finishedAt.Unix() - beganAt.Unix()
if err != nil {
lakeErr := errors.AsLakeErrorType(err)
subTaskName := "unknown"
if lakeErr = lakeErr.As(errors.SubtaskErr); lakeErr != nil {
if meta, ok := lakeErr.GetData().(*plugin.SubTaskMeta); ok {
subTaskName = meta.Name
}
} else {
lakeErr = errors.Convert(err)
}
dbe := db.UpdateColumns(task, []dal.DalSet{
{ColumnName: "status", Value: models.TASK_FAILED},
{ColumnName: "message", Value: lakeErr.Error()},
{ColumnName: "error_name", Value: lakeErr.Messages().Format()},
{ColumnName: "finished_at", Value: finishedAt},
{ColumnName: "spent_seconds", Value: spentSeconds},
{ColumnName: "failed_sub_task", Value: subTaskName},
})
if dbe != nil {
logger.Error(dbe, "failed to finalize task status into db (task failed)")
}
} else {
dbe := db.UpdateColumns(task, []dal.DalSet{
{ColumnName: "status", Value: models.TASK_COMPLETED},
{ColumnName: "message", Value: ""},
{ColumnName: "finished_at", Value: finishedAt},
{ColumnName: "spent_seconds", Value: spentSeconds},
})
if dbe != nil {
logger.Error(dbe, "failed to finalize task status into db (task succeeded)")
}
}
// update finishedTasks
errors.Must(db.UpdateColumn(
&models.Pipeline{},
"finished_tasks", dal.Expr("finished_tasks + 1"),
dal.Where("id=?", task.PipelineId),
))
// not return err if the `SkipOnFail` is true and the error is not canceled
if dbPipeline.SkipOnFail && !errors.Is(err, gocontext.Canceled) {
err = nil
}
}()
if task.Status == models.TASK_COMPLETED {
return nil
}
// start execution
logger.Info("start executing task: %d", task.ID)
dbe := db.UpdateColumns(task, []dal.DalSet{
{ColumnName: "status", Value: models.TASK_RUNNING},
{ColumnName: "message", Value: ""},
{ColumnName: "began_at", Value: beganAt},
})
if dbe != nil {
return dbe
}
err = RunPluginTask(
ctx,
basicRes.ReplaceLogger(logger),
task,
progress,
&dbPipeline.SyncPolicy,
)
return err
}
// RunPluginTask FIXME ...
func RunPluginTask(
ctx gocontext.Context,
basicRes context.BasicRes,
task *models.Task,
progress chan plugin.RunningProgress,
syncPolicy *models.SyncPolicy,
) errors.Error {
pluginMeta, err := plugin.GetPlugin(task.Plugin)
if err != nil {
return errors.Default.WrapRaw(err)
}
pluginTask, ok := pluginMeta.(plugin.PluginTask)
if !ok {
return errors.Default.New(fmt.Sprintf("plugin %s doesn't support PluginTask interface", task.Plugin))
}
return RunPluginSubTasks(
ctx,
basicRes,
task,
pluginTask,
progress,
syncPolicy,
)
}
// RunPluginSubTasks FIXME ...
func RunPluginSubTasks(
ctx gocontext.Context,
basicRes context.BasicRes,
task *models.Task,
pluginTask plugin.PluginTask,
progress chan plugin.RunningProgress,
syncPolicy *models.SyncPolicy,
) errors.Error {
logger := basicRes.GetLogger()
logger.Info("start plugin")
// find out all possible subtasks this plugin can offer
subtaskMetas := pluginTask.SubTaskMetas()
subtasksFlag := make(map[string]bool)
for _, subtaskMeta := range subtaskMetas {
subtasksFlag[subtaskMeta.Name] = subtaskMeta.EnabledByDefault
}
/* subtasksFlag example
subtasksFlag := map[string]bool{
"collectProject": true,
"convertCommits": true,
...
}
*/
// user specifies what subtasks to run
if len(task.Subtasks) != 0 {
// decode user specified subtasks
var specifiedTasks []string
err := api.Decode(task.Subtasks, &specifiedTasks, nil)
if err != nil {
return errors.Default.Wrap(err, "subtasks could not be decoded")
}
if len(specifiedTasks) > 0 {
// first, disable all subtasks
for task := range subtasksFlag {
subtasksFlag[task] = false
}
// second, check specified subtasks is valid and enable them if so
for _, task := range specifiedTasks {
if _, ok := subtasksFlag[task]; ok {
subtasksFlag[task] = true
} else {
return errors.Default.New(fmt.Sprintf("subtask %s does not exist", task))
}
}
}
}
// 1. make sure `Collect` subtasks skip if `SkipCollectors` is true
// 2. make sure `Required` subtasks are always enabled
for _, subtaskMeta := range subtaskMetas {
if syncPolicy != nil && syncPolicy.SkipCollectors && strings.Contains(strings.ToLower(subtaskMeta.Name), "collect") {
subtasksFlag[subtaskMeta.Name] = false
}
if subtaskMeta.Required {
subtasksFlag[subtaskMeta.Name] = true
}
}
// calculate total step(number of task to run)
steps := 0
for _, enabled := range subtasksFlag {
if enabled {
steps++
}
}
taskCtx := contextimpl.NewDefaultTaskContext(ctx, basicRes, task.Plugin, subtasksFlag, progress)
if closeablePlugin, ok := pluginTask.(plugin.CloseablePluginTask); ok {
defer closeablePlugin.Close(taskCtx)
}
options := task.Options
taskData, err := pluginTask.PrepareTaskData(taskCtx, options)
if err != nil {
return errors.Default.Wrap(err, fmt.Sprintf("error preparing task data for %s", task.Plugin))
}
taskCtx.SetSyncPolicy(syncPolicy)
taskCtx.SetData(taskData)
// record subtasks sequence to DB
collectSubtaskNumber := 0
otherSubtaskNumber := 0
isCollector := false
subtask := []models.Subtask{}
for _, subtaskMeta := range subtaskMetas {
subtaskCtx, err := taskCtx.SubTaskContext(subtaskMeta.Name)
if err != nil {
// sth went wrong
return errors.Default.Wrap(err, fmt.Sprintf("error getting context subtask %s", subtaskMeta.Name))
}
if subtaskCtx == nil {
// subtask was disabled
continue
}
if strings.Contains(strings.ToLower(subtaskMeta.Name), "collect") || strings.Contains(strings.ToLower(subtaskMeta.Name), "clone git repo") {
collectSubtaskNumber++
isCollector = true
} else {
otherSubtaskNumber++
isCollector = false
}
s := models.Subtask{
Name: subtaskCtx.GetName(),
TaskID: task.ID,
IsCollector: isCollector,
}
if isCollector {
s.Sequence = collectSubtaskNumber
} else {
s.Sequence = otherSubtaskNumber
}
subtask = append(subtask, s)
}
if err := basicRes.GetDal().CreateOrUpdate(subtask); err != nil {
basicRes.GetLogger().Error(err, "error writing subtask list to DB")
}
// execute subtasks in order
taskCtx.SetProgress(0, steps)
subtaskNumber := 0
for _, subtaskMeta := range subtaskMetas {
subtaskCtx, err := taskCtx.SubTaskContext(subtaskMeta.Name)
if err != nil {
// sth went wrong
return errors.Default.Wrap(err, fmt.Sprintf("error getting context subtask %s", subtaskMeta.Name))
}
subtaskNumber++
if subtaskCtx == nil {
// subtask was disabled
continue
}
// run subtask
if progress != nil {
progress <- plugin.RunningProgress{
Type: plugin.SetCurrentSubTask,
SubTaskName: subtaskMeta.Name,
SubTaskNumber: subtaskNumber,
}
}
subtaskFinished := false
if !subtaskMeta.ForceRunOnResume {
if task.ID > 0 {
sfc := errors.Must1(basicRes.GetDal().Count(
dal.From(&models.Subtask{}), dal.Where("task_id = ? AND name = ? AND finished_at IS NOT NULL", task.ID, subtaskMeta.Name),
),
)
subtaskFinished = sfc > 0
}
}
if subtaskFinished {
logger.Info("subtask %s already finished previously", subtaskMeta.Name)
} else {
logger.Info("executing subtask %s", subtaskMeta.Name)
start := time.Now()
err = runSubtask(basicRes, subtaskCtx, task.ID, subtaskNumber, subtaskMeta.EntryPoint)
logger.Info("subtask %s finished in %d ms", subtaskMeta.Name, time.Since(start).Milliseconds())
if err != nil {
err = errors.SubtaskErr.Wrap(err, fmt.Sprintf("subtask %s ended unexpectedly", subtaskMeta.Name), errors.WithData(&subtaskMeta))
logger.Error(err, "")
where := dal.Where("task_id = ? and name = ?", task.ID, subtaskCtx.GetName())
if err := basicRes.GetDal().UpdateColumns(subtask, []dal.DalSet{
{ColumnName: "is_failed", Value: true},
{ColumnName: "message", Value: err.Error()},
}, where); err != nil {
basicRes.GetLogger().Error(err, "error writing subtask %v status to DB", subtaskCtx.GetName())
}
return err
}
}
taskCtx.IncProgress(1)
}
return nil
}
// UpdateProgressDetail FIXME ...
func UpdateProgressDetail(basicRes context.BasicRes, taskId uint64, progressDetail *models.TaskProgressDetail, p *plugin.RunningProgress) {
cfg := basicRes.GetConfigReader()
skipSubtaskProgressUpdate := cfg.GetBool("SKIP_SUBTASK_PROGRESS")
task := &models.Task{
Model: common.Model{ID: taskId},
}
subtask := &models.Subtask{}
originalFinishedRecords := progressDetail.FinishedRecords
switch p.Type {
case plugin.TaskSetProgress:
progressDetail.TotalSubTasks = p.Total
progressDetail.FinishedSubTasks = p.Current
case plugin.TaskIncProgress:
progressDetail.FinishedSubTasks = p.Current
// TODO: get rid of db update
pct := float32(p.Current) / float32(p.Total)
err := basicRes.GetDal().UpdateColumn(task, "progress", pct)
if err != nil {
basicRes.GetLogger().Error(err, "failed to update progress")
}
case plugin.SubTaskSetProgress:
progressDetail.TotalRecords = p.Total
case plugin.SubTaskIncProgress:
progressDetail.FinishedRecords = p.Current
case plugin.SetCurrentSubTask:
progressDetail.SubTaskName = p.SubTaskName
progressDetail.SubTaskNumber = p.SubTaskNumber
// reset finished records
progressDetail.FinishedRecords = 0
}
if skipSubtaskProgressUpdate {
return
}
currentFinishedRecords := progressDetail.FinishedRecords
currentTotalRecords := progressDetail.TotalRecords
// update progress if progress is more than 1%
// or there is progress if no total record provided
if (currentTotalRecords > 0 && float64(currentFinishedRecords-originalFinishedRecords)/float64(currentTotalRecords) > 0.01) || (currentTotalRecords <= 0 && currentFinishedRecords > originalFinishedRecords) {
// update subtask progress
where := dal.Where("task_id = ? and name = ?", taskId, progressDetail.SubTaskName)
err := basicRes.GetDal().UpdateColumns(subtask, []dal.DalSet{
{ColumnName: "finished_records", Value: progressDetail.FinishedRecords},
}, where)
if err != nil {
basicRes.GetLogger().Error(err, "failed to update _devlake_subtasks progress")
}
}
}
func runSubtask(
basicRes context.BasicRes,
ctx plugin.SubTaskContext,
parentID uint64,
subtaskNumber int,
entryPoint plugin.SubTaskEntryPoint,
) errors.Error {
beginAt := time.Now()
subtask := &models.Subtask{
Name: ctx.GetName(),
TaskID: parentID,
Number: subtaskNumber,
BeganAt: &beginAt,
}
recordSubtask(basicRes, subtask)
// defer to record subtask status
defer func() {
finishedAt := time.Now()
subtask.FinishedAt = &finishedAt
subtask.SpentSeconds = finishedAt.Unix() - beginAt.Unix()
recordSubtask(basicRes, subtask)
}()
return entryPoint(ctx)
}
func recordSubtask(basicRes context.BasicRes, subtask *models.Subtask) {
where := dal.Where("task_id = ? and name = ?", subtask.TaskID, subtask.Name)
if err := basicRes.GetDal().UpdateColumns(subtask, []dal.DalSet{
{ColumnName: "began_at", Value: subtask.BeganAt},
{ColumnName: "finished_at", Value: subtask.FinishedAt},
{ColumnName: "spent_seconds", Value: subtask.SpentSeconds},
//{ColumnName: "finished_records", Value: subtask.FinishedRecords}, // FinishedRecords is zero always.
{ColumnName: "number", Value: subtask.Number},
}, where); err != nil {
basicRes.GetLogger().Error(err, "error writing subtask %d status to DB: %v", subtask.ID)
}
}
func getTaskLogger(parentLogger log.Logger, task *models.Task) (log.Logger, errors.Error) {
logger := parentLogger.Nested(fmt.Sprintf("task #%d", task.ID))
loggingPath := logruslog.GetTaskLoggerPath(logger.GetConfig(), task)
stream, err := logruslog.GetFileStream(loggingPath)
if err != nil {
return nil, err
}
logger.SetStream(&log.LoggerStreamConfig{
Path: loggingPath,
Writer: stream,
})
return logger, nil
}