backend/server/services/pipeline.go (461 lines of code) (raw):
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package services
import (
"context"
"fmt"
"net/url"
"os"
"path/filepath"
"strings"
"sync"
"time"
"golang.org/x/sync/errgroup"
"github.com/spf13/cast"
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/models"
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/core/utils"
"github.com/apache/incubator-devlake/helpers/dbhelper"
"github.com/apache/incubator-devlake/impls/logruslog"
"github.com/google/uuid"
"golang.org/x/sync/semaphore"
)
var defaultNotificationService *DefaultPipelineNotificationService
var globalPipelineLog = logruslog.Global.Nested("pipeline service")
var pluginOptionSanitizers = map[string]func(map[string]interface{}){
"gitextractor": func(options map[string]interface{}) {
if v, ok := options["url"]; ok {
gitUrl := cast.ToString(v)
u, err := url.Parse(gitUrl)
if err != nil {
logger.Error(err, "failed to parse git url", gitUrl)
}
if u != nil && u.User != nil {
password, ok := u.User.Password()
if ok {
escapedUrl, err := url.QueryUnescape(gitUrl)
if err != nil {
logger.Warn(err, "failed to unescape url %s", gitUrl)
} else {
gitUrl = escapedUrl
}
gitUrl = strings.Replace(gitUrl, password, strings.Repeat("*", len(password)), -1)
options["url"] = gitUrl
}
}
}
},
}
// PipelineQuery is a query for GetPipelines
type PipelineQuery struct {
Pagination
Status string `form:"status"`
Pending int `form:"pending"`
BlueprintId uint64 `uri:"blueprintId" form:"blueprint_id"`
Label string `form:"label"`
}
func pipelineServiceInit() {
// initialize plugin
plugin.InitPlugins(basicRes)
// notification
var notificationEndpoint = cfg.GetString("NOTIFICATION_ENDPOINT")
var notificationSecret = cfg.GetString("NOTIFICATION_SECRET")
if strings.TrimSpace(notificationEndpoint) != "" {
defaultNotificationService = NewDefaultPipelineNotificationService(notificationEndpoint, notificationSecret)
}
// standalone mode: reset pipeline status
if cfg.GetBool("RESUME_PIPELINES") {
markInterruptedPipelineAs(models.TASK_RESUME)
} else {
markInterruptedPipelineAs(models.TASK_FAILED)
}
// load cronjobs for blueprints
errors.Must(ReloadBlueprints())
var pipelineMaxParallel = cfg.GetInt64("PIPELINE_MAX_PARALLEL")
if pipelineMaxParallel < 0 {
panic(errors.BadInput.New(`PIPELINE_MAX_PARALLEL should be a positive integer`))
}
if pipelineMaxParallel == 0 {
globalPipelineLog.Warn(nil, `pipelineMaxParallel=0 means pipeline will be run No Limit`)
pipelineMaxParallel = 10000
}
// run pipeline with independent goroutine
if cfg.GetBool("CONSUME_PIPELINES") {
go RunPipelineInQueue(pipelineMaxParallel)
}
}
func markInterruptedPipelineAs(status string) {
errors.Must(db.UpdateColumns(
&models.Pipeline{},
[]dal.DalSet{
{ColumnName: "status", Value: status},
},
dal.Where("status = ?", models.TASK_RUNNING),
))
errors.Must(db.UpdateColumns(
&models.Task{},
[]dal.DalSet{
{ColumnName: "status", Value: status},
},
dal.Where("status = ?", models.TASK_RUNNING),
))
}
// CreatePipeline and return the model
func CreatePipeline(newPipeline *models.NewPipeline, shouldSanitize bool) (*models.Pipeline, errors.Error) {
pipeline, err := CreateDbPipeline(newPipeline)
if err != nil {
return nil, errors.Convert(err)
}
if shouldSanitize {
if err := SanitizePipeline(pipeline); err != nil {
return nil, errors.Convert(err)
}
}
return pipeline, nil
}
func SanitizeBlueprint(blueprint *models.Blueprint) error {
for planStageIdx, pipelineStage := range blueprint.Plan {
for planTaskIdx := range pipelineStage {
pipelineTask, err := SanitizeTask(blueprint.Plan[planStageIdx][planTaskIdx])
if err != nil {
return err
}
blueprint.Plan[planStageIdx][planTaskIdx] = pipelineTask
}
}
return nil
}
func SanitizePipeline(pipeline *models.Pipeline) error {
for planStageIdx, pipelineStage := range pipeline.Plan {
for planTaskIdx := range pipelineStage {
task := pipeline.Plan[planStageIdx][planTaskIdx]
pipelineTask, err := SanitizeTask(task)
if err != nil {
return err
}
pipeline.Plan[planStageIdx][planTaskIdx] = pipelineTask
}
}
return nil
}
func SanitizeTask(pipelineTask *models.PipelineTask) (*models.PipelineTask, error) {
pluginName := pipelineTask.Plugin
options, err := SanitizePluginOption(pluginName, pipelineTask.Options)
if err != nil {
return pipelineTask, err
}
pipelineTask.Options = options
return pipelineTask, nil
}
func SanitizePluginOption(pluginName string, option map[string]interface{}) (map[string]interface{}, error) {
if sanitizer, ok := pluginOptionSanitizers[pluginName]; ok {
sanitizer(option)
}
return option, nil
}
// GetPipelines by query
func GetPipelines(query *PipelineQuery, shouldSanitize bool) ([]*models.Pipeline, int64, errors.Error) {
pipelines, i, err := GetDbPipelines(query)
if err != nil {
return nil, 0, errors.Convert(err)
}
g := new(errgroup.Group)
for idx, p := range pipelines {
tmpPipeline := *p
tmpIdx := idx
g.Go(func() error {
err = fillPipelineDetail(&tmpPipeline)
if err != nil {
return err
}
if shouldSanitize {
if err := SanitizePipeline(&tmpPipeline); err != nil {
return err
}
}
pipelines[tmpIdx] = &tmpPipeline
return nil
})
}
if err := g.Wait(); err != nil {
return nil, 0, errors.Convert(err)
}
return pipelines, i, nil
}
// GetPipeline by id
func GetPipeline(pipelineId uint64, shouldSanitize bool) (*models.Pipeline, errors.Error) {
dbPipeline, err := GetDbPipeline(pipelineId)
if err != nil {
return nil, err
}
err = fillPipelineDetail(dbPipeline)
if err != nil {
return nil, err
}
if shouldSanitize {
if err := SanitizePipeline(dbPipeline); err != nil {
return nil, errors.Convert(err)
}
}
return dbPipeline, nil
}
// GetPipelineLogsArchivePath creates an archive for the logs of this pipeline and returns its file path
func GetPipelineLogsArchivePath(pipeline *models.Pipeline) (string, errors.Error) {
logPath, err := getPipelineLogsPath(pipeline)
if err != nil {
return "", err
}
archive := fmt.Sprintf("%s/%s/logging.tar.gz", os.TempDir(), uuid.New())
if err = utils.CreateGZipArchive(archive, fmt.Sprintf("%s/*", logPath)); err != nil {
return "", err
}
return archive, err
}
func dequeuePipeline(runningParallelLabels []string) (pipeline *models.Pipeline, err errors.Error) {
txHelper := dbhelper.NewTxHelper(basicRes, &err)
defer txHelper.End()
tx := txHelper.Begin()
// mysql read lock, not sure if it works for postgresql
errors.Must(tx.LockTables(dal.LockTables{
{Table: "_devlake_pipelines", Exclusive: false},
{Table: "_devlake_pipeline_labels", Exclusive: false},
}))
// prepare query to find an appropriate pipeline to execute
pipeline = &models.Pipeline{}
err = tx.First(pipeline,
dal.Where("status IN ?", []string{models.TASK_CREATED, models.TASK_RERUN, models.TASK_RESUME}),
dal.Join(
`left join _devlake_pipeline_labels ON
_devlake_pipeline_labels.pipeline_id = _devlake_pipelines.id AND
_devlake_pipeline_labels.name LIKE 'parallel/%' AND
_devlake_pipeline_labels.name in ?`,
runningParallelLabels,
),
dal.Groupby("id"),
dal.Having("count(_devlake_pipeline_labels.name)=0"),
dal.Select("id"),
dal.Orderby("id ASC"),
dal.Limit(1),
)
if err == nil {
// mark the pipeline running, now we want a write lock
if pipeline.BeganAt == nil {
now := time.Now()
pipeline.BeganAt = &now
globalPipelineLog.Info("resumed pipeline #%d", pipeline.ID)
}
errors.Must(tx.LockTables(dal.LockTables{{Table: "_devlake_pipelines", Exclusive: true}}))
err = tx.UpdateColumns(&models.Pipeline{}, []dal.DalSet{
{ColumnName: "status", Value: models.TASK_RUNNING},
{ColumnName: "message", Value: ""},
{ColumnName: "began_at", Value: pipeline.BeganAt},
}, dal.Where("id = ?", pipeline.ID))
if err != nil {
panic(err)
}
return
}
if tx.IsErrorNotFound(err) {
pipeline = nil
err = nil
} else {
// log unexpected err
globalPipelineLog.Error(err, "dequeue failed")
}
return
}
// RunPipelineInQueue query pipeline from db and run it in a queue
func RunPipelineInQueue(pipelineMaxParallel int64) {
sema := semaphore.NewWeighted(pipelineMaxParallel)
runningParallelLabels := []string{}
var runningParallelLabelLock sync.Mutex
var err error
for {
// start goroutine when sema lock ready and pipeline exist.
// to avoid read old pipeline, acquire lock before read exist pipeline
errors.Must(sema.Acquire(context.TODO(), 1))
globalPipelineLog.Info("get lock and wait next pipeline")
var dbPipeline *models.Pipeline
for {
dbPipeline, err = dequeuePipeline(runningParallelLabels)
if err == nil && dbPipeline != nil {
break
}
time.Sleep(time.Second)
}
err = fillPipelineDetail(dbPipeline)
if err != nil {
panic(err)
}
// add pipelineParallelLabels to runningParallelLabels
var pipelineParallelLabels []string
for _, dbLabel := range dbPipeline.Labels {
if strings.HasPrefix(dbLabel, `parallel/`) {
pipelineParallelLabels = append(pipelineParallelLabels, dbLabel)
}
}
runningParallelLabelLock.Lock()
runningParallelLabels = append(runningParallelLabels, pipelineParallelLabels...)
runningParallelLabelLock.Unlock()
go func(pipelineId uint64, parallelLabels []string) {
defer sema.Release(1)
defer func() {
runningParallelLabelLock.Lock()
runningParallelLabels = utils.SliceRemove(runningParallelLabels, parallelLabels...)
runningParallelLabelLock.Unlock()
globalPipelineLog.Info("finish pipeline #%d, now runningParallelLabels is %s", pipelineId, runningParallelLabels)
}()
globalPipelineLog.Info("run pipeline, %d, now running runningParallelLabels are %s", pipelineId, runningParallelLabels)
err = runPipeline(pipelineId)
if err != nil {
globalPipelineLog.Error(err, "failed to run pipeline %d", pipelineId)
}
}(dbPipeline.ID, pipelineParallelLabels)
}
}
func getProjectName(pipeline *models.Pipeline) (string, errors.Error) {
if pipeline == nil {
return "", errors.Default.New("pipeline is nil")
}
blueprintId := pipeline.BlueprintId
dbBlueprint := &models.Blueprint{}
err := db.First(dbBlueprint, dal.Where("id = ?", blueprintId))
if err != nil {
if db.IsErrorNotFound(err) {
return "", errors.NotFound.New(fmt.Sprintf("blueprint(id: %d) not found", blueprintId))
}
return "", errors.Internal.Wrap(err, "error getting the blueprint from database")
}
return dbBlueprint.ProjectName, nil
}
// NotifyExternal FIXME ...
func NotifyExternal(pipelineId uint64) errors.Error {
notification := GetPipelineNotificationService()
if notification == nil {
return nil
}
// send notification to an external web endpoint
pipeline, err := GetPipeline(pipelineId, true)
if err != nil {
return err
}
projectName, err := getProjectName(pipeline)
if err != nil {
return err
}
err = notification.PipelineStatusChanged(PipelineNotificationParam{
ProjectName: projectName,
PipelineID: pipeline.ID,
CreatedAt: pipeline.CreatedAt,
UpdatedAt: pipeline.UpdatedAt,
BeganAt: pipeline.BeganAt,
FinishedAt: pipeline.FinishedAt,
Status: pipeline.Status,
})
if err != nil {
globalPipelineLog.Error(err, "failed to send notification: %v", err)
return err
}
return nil
}
// CancelPipeline FIXME ...
func CancelPipeline(pipelineId uint64) errors.Error {
// prevent RunPipelineInQueue from consuming pending pipelines
pipeline := &models.Pipeline{}
err := db.First(pipeline, dal.Where("id = ?", pipelineId))
if err != nil {
return errors.BadInput.New("pipeline not found")
}
if pipeline.Status == models.TASK_CREATED || pipeline.Status == models.TASK_RERUN {
pipeline.Status = models.TASK_CANCELLED
err = db.Update(pipeline)
if err != nil {
return errors.Default.Wrap(err, "faile to update pipeline")
}
// now, with RunPipelineInQueue being block and target pipeline got updated
// we should update the related tasks as well
err = db.UpdateColumn(
&models.Task{},
"status", models.TASK_CANCELLED,
dal.Where("pipeline_id = ?", pipelineId),
)
if err != nil {
return errors.Default.Wrap(err, "faile to update pipeline tasks")
}
// the target pipeline is pending, no running, no need to perform the actual cancel operation
return nil
}
pendingTasks, count, err := GetTasks(&TaskQuery{PipelineId: pipelineId, Pending: 1, Pagination: Pagination{PageSize: -1}})
if err != nil {
return errors.Convert(err)
}
if count == 0 {
return nil
}
for _, pendingTask := range pendingTasks {
_ = CancelTask(pendingTask.ID)
}
return errors.Convert(err)
}
// getPipelineLogsPath gets the logs directory of this pipeline
func getPipelineLogsPath(pipeline *models.Pipeline) (string, errors.Error) {
pipelineLog := GetPipelineLogger(pipeline)
path := filepath.Dir(pipelineLog.GetConfig().Path)
_, err := os.Stat(path)
if err == nil {
return path, nil
}
if os.IsNotExist(err) {
return "", errors.NotFound.Wrap(err, fmt.Sprintf("logs for pipeline #%d not found", pipeline.ID))
}
return "", errors.Default.Wrap(err, fmt.Sprintf("error validating logs path for pipeline #%d", pipeline.ID))
}
// RerunPipeline would rerun all failed tasks or specified task
func RerunPipeline(pipelineId uint64, task *models.Task) (tasks []*models.Task, err errors.Error) {
// prevent pipeline executor from doing anything that might jeopardize the integrity
pipeline := &models.Pipeline{}
txHelper := dbhelper.NewTxHelper(basicRes, &err)
tx := txHelper.Begin()
defer txHelper.End()
err = txHelper.LockTablesTimeout(2*time.Second, dal.LockTables{
{Table: "_devlake_pipelines", Exclusive: true},
{Table: "_devlake_tasks", Exclusive: true},
})
if err != nil {
err = errors.BadInput.Wrap(err, "failed to lock pipeline table, is there any pending pipeline or deletion?")
return
}
// load the pipeline
err = tx.First(pipeline, dal.Where("id = ?", pipelineId))
if err != nil {
return nil, err
}
// verify the status
if pipeline.Status == models.TASK_RUNNING {
return nil, errors.BadInput.New("pipeline is running")
}
if pipeline.Status == models.TASK_CREATED || pipeline.Status == models.TASK_RERUN {
return nil, errors.BadInput.New("pipeline is waiting to run")
}
// determine which tasks to rerun
var failedTasks []*models.Task
if task != nil {
if task.PipelineId != pipelineId {
return nil, errors.BadInput.New("the task ID and pipeline ID doesn't match")
}
failedTasks = append(failedTasks, task)
} else {
tasks, err := GetTasksWithLastStatus(pipelineId, false, tx)
if err != nil {
return nil, errors.Default.Wrap(err, "error getting tasks")
}
for _, t := range tasks {
if t.Status != models.TASK_COMPLETED {
failedTasks = append(failedTasks, t)
}
}
}
// no tasks to rerun
if len(failedTasks) == 0 {
return nil, errors.BadInput.New("no tasks to be re-ran")
}
// create new tasks
rerunTasks := []*models.Task{}
for _, t := range failedTasks {
// mark previous task failed
t.Status = models.TASK_FAILED
err := tx.UpdateColumn(t, "status", models.TASK_FAILED)
if err != nil {
return nil, err
}
// create new task
rerunTask, err := createTask(&models.NewTask{
PipelineTask: &models.PipelineTask{
Plugin: t.Plugin,
Subtasks: t.Subtasks,
Options: t.Options,
},
PipelineId: t.PipelineId,
PipelineRow: t.PipelineRow,
PipelineCol: t.PipelineCol,
IsRerun: true,
}, tx)
if err != nil {
return nil, err
}
// append to result
rerunTasks = append(rerunTasks, rerunTask)
}
// mark pipline rerun
err = tx.UpdateColumn(&models.Pipeline{},
"status", models.TASK_RERUN,
dal.Where("id = ?", pipelineId),
)
if err != nil {
return nil, err
}
return rerunTasks, nil
}