google_guest_agent/scheduler/scheduler.go (129 lines of code) (raw):
// Copyright 2023 Google LLC
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// https://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package scheduler maintains scheduler utility for scheduling arbitrary jobs.
package scheduler
import (
"context"
"fmt"
"sync"
"time"
"github.com/GoogleCloudPlatform/guest-logging-go/logger"
"github.com/robfig/cron/v3"
)
// Job defines the interface between the schedule manager and the actual job.
type Job interface {
// ID returns the job id.
ID() string
// Interval returns the interval at which job should be rescheduled and
// a bool determining if job should be scheduled starting now.
// If false, first run will be at time now+interval.
Interval() (time.Duration, bool)
// ShouldEnable specifies if the job should be enabled for scheduling.
ShouldEnable(context.Context) bool
// Run triggers the job for single execution. It returns error if any
// and a bool stating if scheduler should continue or stop scheduling.
Run(context.Context) (bool, error)
}
// Scheduler implements job schedule manager and offers a way to schedule/unschedule new jobs.
type Scheduler struct {
cron *cron.Cron
jobs map[string]cron.EntryID
mu sync.RWMutex
}
var scheduler *Scheduler
func init() {
taskIDs := make(map[string]cron.EntryID)
cron := cron.New(cron.WithLogger(&cronLogger{}))
scheduler = &Scheduler{
cron: cron,
jobs: taskIDs,
mu: sync.RWMutex{},
}
}
// Get starts and returns scheduler instance.
func Get() *Scheduler {
scheduler.start()
return scheduler
}
// getFunc generates a wrapper function for cron scheduler.
func (s *Scheduler) getFunc(ctx context.Context, job Job) func() {
f := func() {
logger.Infof("Invoking job %q", job.ID())
schedule, err := job.Run(ctx)
if !schedule {
s.UnscheduleJob(job.ID())
}
if err != nil {
logger.Errorf("Failed to execute job %s: %v", job.ID(), err)
}
}
return f
}
// ScheduleJob adds a job to schedule at defined interval.
func (s *Scheduler) ScheduleJob(ctx context.Context, job Job, synchronous bool) error {
if !job.ShouldEnable(ctx) {
return fmt.Errorf("ShouldEnable() returned false, cannot schedule job %s", job.ID())
}
logger.Infof("Scheduling job: %s", job.ID())
interval, startNow := job.Interval()
if err := s.jobInit(job.ID(), interval, s.getFunc(ctx, job), startNow, synchronous); err != nil {
return err
}
return nil
}
func (s *Scheduler) setEntryID(jobID string, entryID cron.EntryID) {
s.mu.Lock()
defer s.mu.Unlock()
s.jobs[jobID] = entryID
}
// jobInit adds job to the schedule to run at specified interval.
// Setting startImmediately to true executes first run immediately, otherwise
// first run will be after interval (at now+interval).
// If startImmediately and synchronous both are true, init method will block
// until job is completed.
func (s *Scheduler) jobInit(jobID string, interval time.Duration, job func(), startImmediately, synchronous bool) error {
logger.Infof("Scheduling job %q to run at %f hr interval", jobID, interval.Hours())
_, found := s.jobs[jobID]
// If found, job is already running, return.
if found {
logger.Infof("Skipping, job %q is already scheduled", jobID)
return nil
}
entry, err := s.cron.AddFunc(fmt.Sprintf("@every %ds", int(interval.Seconds())), job)
if err != nil {
return fmt.Errorf("unable to schedule %q: %w", jobID, err)
}
s.setEntryID(jobID, entry)
if startImmediately {
if synchronous {
job()
} else {
// Start job in a go routine to not block the caller.
go job()
}
}
return nil
}
// UnscheduleJob removes the job from schedule.
func (s *Scheduler) UnscheduleJob(jobID string) {
s.mu.RLock()
defer s.mu.RUnlock()
logger.Infof("Unscheduling job %q", jobID)
entry, found := s.jobs[jobID]
if found {
s.cron.Remove(entry)
delete(s.jobs, jobID)
}
}
// start begins executing each job at defined interval.
func (s *Scheduler) start() {
logger.Infof("Starting the scheduler to run jobs")
s.cron.Start()
}
// Stop stops executing new jobs.
func (s *Scheduler) Stop() {
logger.Infof("Stopping the scheduler")
s.cron.Stop()
}
// ScheduleJobs schedules required jobs and waits for it to finish if synchronous is true.
func ScheduleJobs(ctx context.Context, jobs []Job, synchronous bool) {
wg := sync.WaitGroup{}
sched := Get()
var ids []string
for _, job := range jobs {
wg.Add(1)
ids = append(ids, job.ID())
go func(job Job) {
defer wg.Done()
if err := sched.ScheduleJob(ctx, job, synchronous); err != nil {
logger.Errorf("Failed to schedule job %s with error: %v", job.ID(), err)
} else {
logger.Infof("Successfully scheduled job %s", job.ID())
}
}(job)
}
if synchronous {
logger.Debugf("Waiting for %v to finish...", ids)
wg.Wait()
}
}
// IsScheduled returns true if job was scheduled.
func (s *Scheduler) IsScheduled(jobID string) bool {
s.mu.RLock()
defer s.mu.RUnlock()
_, found := s.jobs[jobID]
return found
}