internal/scheduler/scheduler.go (146 lines of code) (raw):
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package scheduler maintains scheduler utility for scheduling arbitrary jobs.
package scheduler
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/GoogleCloudPlatform/galog"
)
// Job defines the interface between the schedule manager and the actual job.
type Job interface {
// ID returns the job id.
ID() string
// Interval returns the interval at which job should be rescheduled and
// a bool determining if job should be scheduled starting now.
// If false, first run will be at time now+interval.
Interval() (time.Duration, bool)
// ShouldEnable specifies if the job should be enabled for scheduling.
ShouldEnable(context.Context) bool
// Run triggers the job for single execution. It returns error if any
// and a bool stating if scheduler should continue or stop scheduling.
Run(context.Context) (bool, error)
}
type jobConfig struct {
// job is the job interface that is executed and managed by scheduler.
job Job
// interrupt is a channel to signal and interrupt the job.
interrupt chan bool
// markedRemoved is a signal to not process and the job is marked for removal.
// This signal is marked only if job.Run() returns not to reschedule again.
markedRemoved atomic.Bool
}
// Scheduler is a task schedule manager and offers a way to schedule/unschedule new jobs.
type Scheduler struct {
// mu protects tasks map.
mu sync.Mutex
// jobs is a map of task id to its task managed by this scheduler.
jobs map[string]*jobConfig
}
// scheduler is the scheduler instance.
var scheduler *Scheduler
func init() {
scheduler = &Scheduler{
jobs: make(map[string]*jobConfig),
mu: sync.Mutex{},
}
}
// Instance returns scheduler instance.
func Instance() *Scheduler {
return scheduler
}
// add adds to the list of tasks managed by scheduler.
func (s *Scheduler) add(j *jobConfig) {
s.mu.Lock()
defer s.mu.Unlock()
s.jobs[j.job.ID()] = j
}
// remove removes the task for list of tasks managed by the scheduler.
func (s *Scheduler) remove(jobID string) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.jobs, jobID)
}
// isScheduled returns true if job is already scheduled.
func (s *Scheduler) isScheduled(jobID string) bool {
s.mu.Lock()
defer s.mu.Unlock()
_, ok := s.jobs[jobID]
return ok
}
// run executes the job and returns if scheduler should continue scheduling.
func run(ctx context.Context, job Job) bool {
galog.Debugf("Executing job %q", job.ID())
ok, err := job.Run(ctx)
if err != nil {
galog.Errorf("Job %q failed with error: %v", job.ID(), err)
}
return ok
}
// ScheduleJob adds a job to schedule at defined interval.
func (s *Scheduler) ScheduleJob(ctx context.Context, job Job) error {
if s.isScheduled(job.ID()) {
galog.Infof("Skipping schedule job request for %q, its already scheduled", job.ID())
return nil
}
if !job.ShouldEnable(ctx) {
return fmt.Errorf("ShouldEnable() returned false, cannot schedule job %s", job.ID())
}
interval, startNow := job.Interval()
galog.Debugf("Adding job %q, to run at every %f seconds", job.ID(), interval.Seconds())
if startNow && !run(ctx, job) {
galog.Debugf("Job %q first execution returned false, won't be scheduled", job.ID())
return nil
}
task := &jobConfig{job: job, interrupt: make(chan bool)}
s.add(task)
go s.runOnSchedule(ctx, task)
return nil
}
// runOnSchedule runs the job on fixed schedule.
func (s *Scheduler) runOnSchedule(ctx context.Context, j *jobConfig) {
interval, _ := j.job.Interval()
ticker := time.NewTicker(interval)
defer func() {
ticker.Stop()
close(j.interrupt)
}()
for {
select {
case <-ticker.C:
if !j.markedRemoved.Load() && !run(ctx, j.job) {
galog.Infof("Job %q execution returned false, won't be rescheduled", j.job.ID())
j.markedRemoved.Store(true)
s.remove(j.job.ID())
return
}
case <-j.interrupt:
galog.Infof("Interrupted, returning from job %q", j.job.ID())
return
case <-ctx.Done():
galog.Infof("Context cancelled, returning from job %q", j.job.ID())
s.remove(j.job.ID())
return
}
}
}
// UnscheduleJob removes the job from schedule.
func (s *Scheduler) UnscheduleJob(jobID string) {
galog.Infof("Unscheduling job %q", jobID)
if !s.isScheduled(jobID) {
return
}
s.mu.Lock()
defer s.mu.Unlock()
task := s.jobs[jobID]
task.interrupt <- true
delete(s.jobs, jobID)
}
// Stop stops executing new jobs.
func (s *Scheduler) Stop() {
galog.Infof("Stopping the scheduler")
s.mu.Lock()
defer s.mu.Unlock()
for _, j := range s.jobs {
if !j.markedRemoved.Load() {
j.interrupt <- true
delete(s.jobs, j.job.ID())
}
}
}
// ScheduleJobs schedules required jobs and waits for it to finish if
// Interval() returned startImmediately and synchronous both are true.
func ScheduleJobs(ctx context.Context, jobs []Job, synchronous bool) {
wg := sync.WaitGroup{}
sched := Instance()
var ids []string
for _, job := range jobs {
wg.Add(1)
ids = append(ids, job.ID())
go func(job Job) {
defer wg.Done()
if err := sched.ScheduleJob(ctx, job); err != nil {
galog.Errorf("Failed to schedule job %s with error: %v", job.ID(), err)
} else {
galog.Infof("Successfully scheduled job %s", job.ID())
}
}(job)
}
if synchronous {
galog.Debugf("Waiting for %v to finish...", ids)
wg.Wait()
}
}