pkg/timestamp/scheduler.go (155 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package timestamp
import (
"runtime/debug"
"sync"
"time"
"github.com/benbjohnson/clock"
"github.com/pkg/errors"
"github.com/robfig/cron/v3"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
)
var (
// ErrSchedulerClosed indicates the scheduler is closed.
ErrSchedulerClosed = errors.New("the scheduler is closed")
// ErrTaskDuplicated indicates registered task already exists.
ErrTaskDuplicated = errors.New("the task is duplicated")
)
// SchedulerAction is an executable when a trigger is fired
// now is the trigger time, logger has a context indicating the task's identity.
type SchedulerAction func(now time.Time, logger *logger.Logger) bool
// Scheduler maintains a registry of tasks and their duty cycle.
// It also provides a Trigger method to fire a task that is scheduled by a MockClock.
type Scheduler struct {
clock Clock
l *logger.Logger
tasks map[string]*task
sync.RWMutex
isMock bool
closed bool
}
// NewScheduler returns an instance of Scheduler.
func NewScheduler(parent *logger.Logger, clock Clock) *Scheduler {
var isMock bool
if _, ok := clock.(MockClock); ok {
isMock = true
}
return &Scheduler{
isMock: isMock,
l: parent.Named("scheduler"),
clock: clock,
tasks: make(map[string]*task),
}
}
// Register adds the given task's SchedulerAction to the Scheduler,
// and associate the given schedule expression.
func (s *Scheduler) Register(name string, options cron.ParseOption, expr string, action SchedulerAction) error {
s.Lock()
defer s.Unlock()
if s.closed {
return ErrSchedulerClosed
}
if _, ok := s.tasks[name]; ok {
return errors.WithMessage(ErrTaskDuplicated, name)
}
parser := cron.NewParser(options)
schedule, err := parser.Parse(expr)
if err != nil {
return err
}
var clock Clock
if s.isMock {
mc := NewMockClock()
mc.Set(s.clock.Now())
clock = mc
} else {
clock = s.clock
}
t := newTask(s.l.Named(name), name, clock, schedule, action)
s.tasks[name] = t
go func() {
t.run()
t.close()
s.Lock()
defer s.Unlock()
delete(s.tasks, name)
}()
return nil
}
// Trigger fire a task that is scheduled by a MockTime.
// A real clock-based task will ignore this trigger, and return false.
// If the task's name is unknown, it returns false.
func (s *Scheduler) Trigger(name string) bool {
if !s.isMock {
return false
}
var t *task
var ok bool
s.RLock()
t, ok = s.tasks[name]
s.RUnlock()
if !ok {
return false
}
c := t.clock.(MockClock)
c.Set(s.clock.Now())
return true
}
// Closed returns whether the Scheduler is closed.
func (s *Scheduler) Closed() bool {
s.RLock()
defer s.RUnlock()
return s.closed
}
// Close the Scheduler and shut down all registered tasks.
func (s *Scheduler) Close() {
s.Lock()
defer s.Unlock()
s.closed = true
for k, t := range s.tasks {
t.close()
delete(s.tasks, k)
}
}
type task struct {
clock Clock
schedule cron.Schedule
closer *run.Closer
l *logger.Logger
action SchedulerAction
name string
}
func newTask(l *logger.Logger, name string, clock clock.Clock, schedule cron.Schedule, action SchedulerAction) *task {
return &task{
l: l,
name: name,
clock: clock,
schedule: schedule,
action: action,
closer: run.NewCloser(1),
}
}
func (t *task) run() {
defer t.closer.Done()
now := t.clock.Now()
t.l.Info().Str("name", t.name).Time("now", now).Msg("start")
for {
next := t.schedule.Next(now)
d := next.Sub(now)
if e := t.l.Debug(); e.Enabled() {
e.Str("name", t.name).Time("now", now).Time("next", next).Dur("dur", d).Msg("schedule to")
}
timer := t.clock.Timer(d)
select {
case now = <-timer.C:
if e := t.l.Debug(); e.Enabled() {
e.Str("name", t.name).Time("now", now).Msg("wake")
}
if !func() (ret bool) {
defer func() {
if r := recover(); r != nil {
t.l.Error().Str("name", t.name).Interface("panic", r).Str("stack", string(debug.Stack())).Msg("panic")
ret = true
}
}()
return t.action(now, t.l)
}() {
t.l.Info().Str("name", t.name).Msg("action stops the task")
return
}
case <-t.closer.CloseNotify():
timer.Stop()
t.l.Info().Str("name", t.name).Msg("closed")
return
}
}
}
func (t *task) close() {
t.closer.CloseThenWait()
}