internal/schedule/inline_scheduler.go (104 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one or more // contributor license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright ownership. // The ASF licenses this file to You under the Apache License, Version 2.0 // (the "License"); you may not use this file except in compliance with // the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package schedule import ( "context" "github.com/apache/incubator-eventmesh/eventmesh-server-go/log" "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/config" "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/constants" "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal" "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal/model" "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/filter" "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/metrics" "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/task" "github.com/avast/retry-go/v4" "gorm.io/gorm" "time" ) func init() { schedulerBuilder["in-line"] = NewInlineScheduler } const ( schedulerLockKey = "scheduler_lock" schedulerLockTimeout = 2 ) type inlineScheduler struct { workflowDAL dal.WorkflowDAL } func NewInlineScheduler() Scheduler { var s inlineScheduler s.workflowDAL = dal.NewWorkflowDAL() return &s } func (s *inlineScheduler) Run() { go func() { ticker := time.NewTicker(time.Millisecond * time.Duration(config.Get().Flow.Scheduler.Interval)) defer func() { if err := recover(); err != nil { log.Get(constants.LogSchedule).Errorf("schedule run error=%+v", err) } ticker.Stop() }() for range ticker.C { s.handle() } }() } func (s *inlineScheduler) handle() { if err := dal.GetDalClient().Transaction(func(tx *gorm.DB) error { var res *model.WorkflowTaskInstance if err := s.lock(func() error { var err error res, err = s.fetchTask(tx) if err != nil { return err } return nil }); err != nil { return err } if res == nil { return nil } // filter input data filter.FilterWorkflowTaskInputData(res) t := task.New(res) if t == nil { return nil } if err := retry.Do(func() error { return t.Run() }, retry.Attempts(constants.RetryAttempts)); err != nil { return s.workflowDAL.UpdateTaskInstance(tx, &model.WorkflowTaskInstance{ID: res.ID, Status: constants.TaskInstanceFailStatus}) } return s.workflowDAL.UpdateTaskInstance(tx, &model.WorkflowTaskInstance{ID: res.ID, Status: constants.TaskInstanceSuccessStatus}) }); err != nil { log.Get(constants.LogSchedule).Errorf("handle UpdateTaskInstance error=%v", err) } } func (s *inlineScheduler) lock(h func() error) error { start := time.Now() l, err := dal.GetLockClient().ObtainTimeout(schedulerLockKey, schedulerLockTimeout) elapsed := time.Since(start).Milliseconds() metrics.RecordLatency(constants.MetricsScheduler, constants.MetricsDbLockAcquireTime, float64(elapsed)) if err != nil { return err } defer l.Release() return h() } func (s *inlineScheduler) fetchTask(tx *gorm.DB) (*model.WorkflowTaskInstance, error) { var err error res, err := s.workflowDAL.SelectTaskInstance(context.Background(), model.WorkflowTaskInstance{Status: constants.TaskInstanceWaitStatus}) if err != nil { return nil, err } if res == nil { return nil, nil } if err = s.workflowDAL.UpdateTaskInstance(tx, &model.WorkflowTaskInstance{ID: res.ID, Status: constants.TaskInstanceProcessStatus}); err != nil { return nil, err } return res, nil }