internal/pkg/scheduler/scheduler.go (94 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
// Package scheduler provides the ability to run functions on a schedule
package scheduler
import (
"context"
"errors"
"time"
"math/rand"
"github.com/rs/zerolog"
"golang.org/x/sync/errgroup"
)
const (
defaultSplayPercent = 10
defaultFirstRunDelay = 10 * time.Second
)
// WorkFunc is the type of function a Scheduler can run.
type WorkFunc func(ctx context.Context) error
// Schedule tracks when to execute a WorkFunc.
type Schedule struct {
Name string
Interval time.Duration // Time between executions
WorkFn WorkFunc
}
// Scheduler tracks scheduled functions.
type Scheduler struct {
splayPercent int
firstRunDelay time.Duration // Interval to run the scheduled function for the first time since the scheduler started, splayed as well.
rand *rand.Rand
schedules []Schedule
}
// OptFunc is a functional option used to configure a scheduler.
type OptFunc func(*Scheduler) error
// WithSplayPercent sets the splay value as a percentage.
// Only values less then 100 are allowed.
func WithSplayPercent(splayPercent uint) OptFunc {
return func(s *Scheduler) error {
if splayPercent >= 100 {
return errors.New("invalid splay value, expected < 100")
}
s.splayPercent = int(splayPercent) //nolint:gosec // disable G115
return nil
}
}
// WithFirstRunDelay sets the amount of time that scheduled functions will wait on the first execution.
func WithFirstRunDelay(delay time.Duration) OptFunc {
return func(s *Scheduler) error {
s.firstRunDelay = delay
return nil
}
}
// New creates a new Scheduler with the specified schedules.
// Schedules may not be added to a scheduler after creation.
func New(schedules []Schedule, opts ...OptFunc) (*Scheduler, error) {
s := &Scheduler{
splayPercent: defaultSplayPercent,
firstRunDelay: defaultFirstRunDelay,
rand: rand.New(rand.NewSource(time.Now().UnixNano())), //nolint:gosec // used for timing offsets
schedules: schedules,
}
for _, opt := range opts {
err := opt(s)
if err != nil {
return nil, err
}
}
return s, nil
}
// Run executes all scheduled function according to their schedules.
// Schedule Interval times are guaranteed minium values (if the execution takes a very long time, the scheduler will wait Interval before running the function again).
func (s *Scheduler) Run(ctx context.Context) error {
g, ctx := errgroup.WithContext(ctx)
log := zerolog.Ctx(ctx).With().Str("ctx", "elasticsearch CG scheduler").Logger()
ctx = log.WithContext(ctx)
for _, schedule := range s.schedules {
g.Go(s.getRunScheduleFunc(ctx, schedule))
}
return g.Wait()
}
func (s *Scheduler) getRunScheduleFunc(ctx context.Context, schedule Schedule) func() error {
return func() error {
log := zerolog.Ctx(ctx).With().Str("schedule", schedule.Name).Logger()
t := time.NewTimer(s.intervalWithSplay(s.firstRunDelay)) // Initial schedule to run right away with splayed randomly delay
defer t.Stop()
for {
select {
case <-ctx.Done():
log.Debug().Msg("exiting on context cancel")
return nil
case <-t.C:
runSchedule(ctx, log, schedule)
t.Reset(s.intervalWithSplay(schedule.Interval))
}
}
}
}
func (s *Scheduler) intervalWithSplay(interval time.Duration) time.Duration {
percent := 100 - s.splayPercent + s.rand.Intn(2*s.splayPercent+1)
return time.Duration(int64(interval) / int64(100.0) * int64(percent))
}
func runSchedule(ctx context.Context, log zerolog.Logger, schedule Schedule) {
log.Debug().Dur("interval", schedule.Interval).Msg("started")
err := schedule.WorkFn(ctx)
if err != nil {
log.Warn().Err(err).Msg("scheduler.runSchedule: failed running schedule function")
}
log.Debug().Msg("finished")
}