backend/server/services/task.go (321 lines of code) (raw):
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package services
import (
"context"
"fmt"
"math"
"strings"
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/log"
"github.com/apache/incubator-devlake/core/models"
"github.com/apache/incubator-devlake/impls/logruslog"
)
var taskLog = logruslog.Global.Nested("task service")
// TaskQuery FIXME .
type TaskQuery struct {
Pagination
Status string `form:"status"`
Plugin string `form:"plugin"`
PipelineId uint64 `form:"pipelineId" uri:"pipelineId"`
Pending int `form:"pending"`
}
func createTask(newTask *models.NewTask, tx dal.Transaction) (*models.Task, errors.Error) {
task := &models.Task{
Plugin: newTask.Plugin,
Subtasks: newTask.Subtasks,
Options: newTask.Options,
Status: models.TASK_CREATED,
Message: "",
PipelineId: newTask.PipelineId,
PipelineRow: newTask.PipelineRow,
PipelineCol: newTask.PipelineCol,
}
if newTask.IsRerun {
task.Status = models.TASK_RERUN
}
err := tx.Create(task)
if err != nil {
taskLog.Error(err, "save task failed")
return nil, errors.Internal.Wrap(err, "save task failed")
}
return task, nil
}
// GetTasks returns paginated tasks that match the given query
func GetTasks(query *TaskQuery) ([]*models.Task, int64, errors.Error) {
// verify query
if err := VerifyStruct(query); err != nil {
return nil, 0, err
}
// construct common query clauses
clauses := []dal.Clause{dal.From(&models.Task{})}
if query.Status != "" {
clauses = append(clauses, dal.Where("status = ?", query.Status))
}
if query.Plugin != "" {
clauses = append(clauses, dal.Where("plugin = ?", query.Plugin))
}
if query.PipelineId > 0 {
clauses = append(clauses, dal.Where("pipeline_id = ?", query.PipelineId))
}
if query.Pending > 0 {
clauses = append(clauses, dal.Where("finished_at is null"))
}
// count total records
count, err := db.Count(clauses...)
if err != nil {
return nil, 0, err
}
// load paginated records from db
clauses = append(clauses,
dal.Orderby("id DESC"),
dal.Offset(query.GetSkip()),
dal.Limit(query.GetPageSizeOr(10000)),
)
tasks := make([]*models.Task, 0)
err = db.All(&tasks, clauses...)
if err != nil {
return nil, count, err
}
// fill running information
runningTasks.FillProgressDetailToTasks(tasks)
return tasks, count, nil
}
// GetTasksWithLastStatus returns task list of the pipeline, only the most recently tasks would be returned
// TODO: adopts GetLatestTasksOfPipeline
func GetTasksWithLastStatus(pipelineId uint64, shouldSanitize bool, tx dal.Dal) ([]*models.Task, errors.Error) {
if tx == nil {
tx = db
}
var tasks []*models.Task
err := tx.All(&tasks, dal.Where("pipeline_id = ?", pipelineId), dal.Orderby("id DESC"))
if err != nil {
return nil, err
}
taskIds := make(map[int64]struct{})
var result []*models.Task
var maxRow, maxCol int
for _, task := range tasks {
if task.PipelineRow > maxRow {
maxRow = task.PipelineRow
}
if task.PipelineCol > maxCol {
maxCol = task.PipelineCol
}
}
for _, task := range tasks {
index := int64(task.PipelineRow)*int64(maxCol) + int64(task.PipelineCol)
if shouldSanitize {
taskOption, err := SanitizePluginOption(task.Plugin, task.Options)
if err != nil {
return nil, errors.Convert(err)
}
task.Options = taskOption
}
if _, ok := taskIds[index]; !ok {
taskIds[index] = struct{}{}
result = append(result, task)
}
}
runningTasks.FillProgressDetailToTasks(result)
return result, nil
}
// GetTask FIXME ...
func GetTask(taskId uint64) (*models.Task, errors.Error) {
task := &models.Task{}
err := db.First(task, dal.Where("id = ?", taskId))
if err != nil {
if db.IsErrorNotFound(err) {
return nil, errors.NotFound.New("task not found")
}
return nil, errors.Internal.Wrap(err, "error getting the task from database")
}
return task, nil
}
// CancelTask FIXME ...
func CancelTask(taskId uint64) errors.Error {
cancel, err := runningTasks.Remove(taskId)
if err != nil {
return err
}
cancel()
return nil
}
// RunTasksStandalone run tasks in parallel
func RunTasksStandalone(parentLogger log.Logger, taskIds []uint64) errors.Error {
if len(taskIds) == 0 {
return nil
}
results := make(chan error)
for _, taskId := range taskIds {
go func(id uint64) {
taskLog.Info("run task #%d in background ", id)
var err errors.Error
taskErr := runTaskStandalone(parentLogger, id)
if taskErr != nil {
err = errors.Default.Wrap(taskErr, fmt.Sprintf("Error running task %d.", id))
}
results <- err
}(taskId)
}
errs := make([]error, 0)
var err error
finished := 0
for err = range results {
if err != nil {
taskLog.Error(err, "task failed")
errs = append(errs, err)
}
finished++
if finished == len(taskIds) {
close(results)
}
}
if len(errs) > 0 {
var sb strings.Builder
for _, e := range errs {
_, _ = sb.WriteString(e.Error())
_, _ = sb.WriteString("\n")
if errors.Is(e, context.Canceled) {
parentLogger.Info("task canceled")
return errors.Convert(e)
}
}
err = errors.Default.New(sb.String())
}
return errors.Convert(err)
}
// RerunTask reruns specified task
func RerunTask(taskId uint64) (*models.Task, errors.Error) {
task, err := GetTask(taskId)
if err != nil {
return nil, err
}
rerunTasks, err := RerunPipeline(task.PipelineId, task)
if err != nil {
return nil, err
}
rerunTask := rerunTasks[0]
taskOption, sanitizePluginOptionErr := SanitizePluginOption(rerunTask.Plugin, rerunTask.Options)
if sanitizePluginOptionErr != nil {
return nil, errors.Convert(err)
}
rerunTask.Options = taskOption
return rerunTask, nil
}
// GetSubTasksInfo returns subtask list of the pipeline, only the most recently subtasks would be returned
func GetSubTasksInfo(pipelineId uint64, shouldSanitize bool, tx dal.Dal) (*models.SubTasksOuput, errors.Error) {
if tx == nil {
tx = db
}
var tasks []*models.Task
err := tx.All(&tasks, dal.Where("pipeline_id = ?", pipelineId), dal.Orderby("id"))
if err != nil {
return nil, err
}
lastTasks := filterTasksWithLastStatus(tasks)
var subtasksInfo []models.SubtasksInfo
var totalSubtasksCount int64
var totalFinishedSubTasksCount int64
var count int64
var status []string
for _, task := range lastTasks {
if task.Plugin == "org" || task.Plugin == "refdiff" || task.Plugin == "dora" {
continue
}
subTaskResult := models.SubtasksInfo{
ID: task.ID,
PipelineID: task.PipelineId,
CreatedAt: task.CreatedAt,
UpdatedAt: task.UpdatedAt,
BeganAt: task.BeganAt,
FinishedAt: task.FinishedAt,
Plugin: task.Plugin,
Status: task.Status,
Message: task.Message,
ErrorName: task.ErrorName,
SpentSeconds: task.SpentSeconds,
}
if shouldSanitize {
taskOption, err := SanitizePluginOption(task.Plugin, task.Options)
if err != nil {
return nil, errors.Convert(err)
}
subTaskResult.Options = taskOption
}
subtasks := []*models.Subtask{}
err = tx.All(&subtasks, dal.Where("task_id = ?", task.ID))
if err != nil {
return nil, err
}
for _, subtask := range subtasks {
t := &models.SubtaskDetails{
ID: subtask.ID,
CreatedAt: subtask.CreatedAt,
UpdatedAt: subtask.UpdatedAt,
TaskID: subtask.TaskID,
Name: subtask.Name,
Number: subtask.Number,
BeganAt: subtask.BeganAt,
FinishedAt: subtask.FinishedAt,
SpentSeconds: subtask.SpentSeconds,
FinishedRecords: subtask.FinishedRecords,
Sequence: subtask.Sequence,
IsCollector: subtask.IsCollector,
IsFailed: subtask.IsFailed,
Message: subtask.Message,
}
subTaskResult.SubtaskDetails = append(subTaskResult.SubtaskDetails, t)
}
subtasksInfo = append(subtasksInfo, subTaskResult)
collectSubtasksCount := errors.Must1(tx.Count(dal.From("_devlake_subtasks"), dal.Where("task_id = ? and is_collector = true", task.ID)))
totalSubtasksCount += collectSubtasksCount
finishedSubTasksCount := errors.Must1(tx.Count(dal.From("_devlake_subtasks"), dal.Where("task_id = ? and is_collector = true and finished_at is not null", task.ID)))
totalFinishedSubTasksCount += finishedSubTasksCount
count++
status = append(status, task.Status)
}
subTasksOuput := &models.SubTasksOuput{}
subTasksOuput.SubtasksInfo = subtasksInfo
subTasksOuput.Count = totalSubtasksCount
completionRateFloat := float64(totalFinishedSubTasksCount) / float64(totalSubtasksCount)
roundedCompletionRate := math.Round(completionRateFloat*100) / 100
if math.IsNaN(roundedCompletionRate) {
roundedCompletionRate = 1
}
subTasksOuput.CompletionRate = roundedCompletionRate
subTasksOuput.Status = getTaskStatus(status)
subTasksOuput.Count = count
return subTasksOuput, nil
}
// filterTasksWithLastStatus returns the latest task for each plugin
func filterTasksWithLastStatus(tasks []*models.Task) []*models.Task {
taskMap := make(map[string]*models.Task)
for _, task := range tasks {
key := fmt.Sprintf("%d-%d-%d", task.PipelineId, task.PipelineRow, task.PipelineCol)
if existingTask, ok := taskMap[key]; ok {
if task.BeganAt != nil && (existingTask.BeganAt == nil || task.BeganAt.After(*existingTask.BeganAt)) {
taskMap[key] = task
}
} else {
taskMap[key] = task
}
}
var filteredTasks []*models.Task
for _, task := range tasks {
key := fmt.Sprintf("%d-%d-%d", task.PipelineId, task.PipelineRow, task.PipelineCol)
if filteredTask, ok := taskMap[key]; ok {
filteredTasks = append(filteredTasks, filteredTask)
}
}
return filteredTasks
}
func getTaskStatus(statuses []string) string {
var status string
if len(statuses) == 0 {
return status
}
failedCount := 0
completedCount := 0
for _, s := range statuses {
if s == models.TASK_FAILED {
failedCount++
} else if s == models.TASK_COMPLETED {
completedCount++
}
}
if failedCount > 0 && completedCount > 0 {
status = "TASK_PARTIAL"
} else if failedCount == len(statuses) {
status = models.TASK_FAILED
} else if completedCount == len(statuses) {
status = models.TASK_COMPLETED
} else {
status = models.TASK_RUNNING
}
return status
}