pkg/scheduler/periodic.go (69 lines of code) (raw):
package scheduler
import (
"context"
"time"
"github.com/Azure/adx-mon/pkg/logger"
)
type Elector interface {
IsLeader() bool
}
type Periodic struct {
cancelFn context.CancelFunc
closing chan struct{}
elector Elector
}
func NewScheduler(elector Elector) *Periodic {
return &Periodic{
elector: elector,
}
}
func (s *Periodic) Open(ctx context.Context) error {
ctx, cancelFn := context.WithCancel(ctx)
s.cancelFn = cancelFn
s.closing = make(chan struct{})
return nil
}
func (s *Periodic) Close() error {
s.cancelFn()
close(s.closing)
return nil
}
// ScheduleEvery schedules a function to run at a specified interval.
// It continuously executes the provided function until the scheduler is closed.
// Scheduler implements Elector interface. This allows the scheduler to run only on the leader node.
//
// Parameters:
// - interval: The duration between each execution of the function.
// - name: A string representing the name of the scheduled task, used for logging purposes.
// - fn: The function to be executed at each interval. It receives a context parameter.
//
// The function uses a ticker to trigger the execution of the provided function at the specified interval.
// If the scheduler is closed, the function stops running.
//
// Example usage:
//
// scheduler := NewScheduler(elector)
// scheduler.Open(ctx)
// scheduler.ScheduleEvery(time.Minute, "example-task", func(ctx context.Context) error {
// // Task implementation
// return nil
// })
func (s *Periodic) ScheduleEvery(interval time.Duration, name string, fn func(ctx context.Context) error) {
go func() {
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-s.closing:
return
case <-t.C:
if s.elector != nil && !s.elector.IsLeader() {
continue
}
if err := fn(context.Background()); err != nil {
logger.Errorf("Failed to run scheduled task %s: %s", name, err)
}
}
}
}()
}
type Runner interface {
Run(ctx context.Context) error
Name() string
}
// RunForever runs the given runners every specified interval.
// It continuously executes the `Run` method of each provided Runner until the context is done.
// The runners are ran sequentially in a single thread.
//
// Parameters:
// - ctx: The context to control the lifecycle of the function. When the context is done, the function stops running.
// - interval: The duration between each execution of the runners.
// - runners: A variadic parameter of Runner interfaces to be executed.
//
// The function uses a ticker to trigger the execution of the runners at the specified interval.
// If the context is done, the function stops and returns.
//
// Example usage:
//
// ctx, cancel := context.WithCancel(context.Background())
// defer cancel()
// RunForever(ctx, time.Minute, runner1, runner2)
func RunForever(ctx context.Context, interval time.Duration, runners ...Runner) {
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
for _, r := range runners {
if err := r.Run(ctx); err != nil {
logger.Errorf("Failed to run Runner %s: %v", r.Name(), err)
}
}
}
}
}