util/queue/queue.go (260 lines of code) (raw):

/* Copyright 2021 The TestGrid Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ // Package queue contains methods and structures for manipulating and persisting // data that should be repeatedly processed in a prioritized (but changing) order. // // Esepcially useful for combining with systems like pubsub to receive events // that change the order of queue processing. package queue import ( "container/heap" "context" "errors" "fmt" "sync" "time" "github.com/sirupsen/logrus" ) // Queue can send names to receivers at a specific frequency. // // Also contains the ability to modify the next time to send names. // First call must be to Init(). // Exported methods are safe to call concurrently. type Queue struct { queue priorityQueue items map[string]*item lock sync.RWMutex signal chan struct{} log logrus.FieldLogger } // Init (or reinit) the queue with the specified groups, which should be updated at frequency. func (q *Queue) Init(log logrus.FieldLogger, names []string, when time.Time) { n := len(names) found := make(map[string]bool, n) q.lock.Lock() defer q.lock.Unlock() defer q.rouse() if q.signal == nil { q.signal = make(chan struct{}) } if q.items == nil { q.items = make(map[string]*item, n) } if q.queue == nil { q.queue = make(priorityQueue, 0, n) } items := q.items q.log = log for _, name := range names { found[name] = true if _, ok := items[name]; ok { continue } it := &item{ name: name, when: when, index: len(q.queue), } heap.Push(&q.queue, it) items[name] = it log.WithFields(logrus.Fields{ "when": when, "name": name, }).Info("Adding name to queue") } for name, it := range items { if found[name] { continue } log.WithField("name", name).Info("Removing name from queue") heap.Remove(&q.queue, it.index) delete(q.items, name) } } // FixAll will fix multiple groups inside a single critical section. // // If later is set then it will move out the next update time, otherwise // it will only reduce it. func (q *Queue) FixAll(whens map[string]time.Time, later bool) error { q.lock.Lock() defer q.lock.Unlock() var missing []string defer q.rouse() reduced := map[string]time.Time{} fixed := map[string]time.Time{} for name, when := range whens { it, ok := q.items[name] if !ok { missing = append(missing, name) continue } if when.Before(it.when) { reduced[name] = when } else if later && !when.Equal(it.when) { fixed[name] = when } else { continue } it.when = when } log := q.log var any bool if n := len(reduced); n > 0 { log = log.WithField("reduced", n) any = true } if n := len(fixed); n > 0 { log = log.WithField("fixed", n) any = true } heap.Init(&q.queue) if len(missing) > 0 { return fmt.Errorf("not found: %v", missing) } if any { log.Info("Fixed all names") } return nil } // Fix the next time to send the group to receivers. // // If later is set then it will move out the next update time, otherwise // it will only reduce it. func (q *Queue) Fix(name string, when time.Time, later bool) error { q.lock.Lock() defer q.lock.Unlock() defer q.rouse() it, ok := q.items[name] if !ok { return errors.New("not found") } log := q.log.WithFields(logrus.Fields{ "group": name, "when": when, }) if when.Before(it.when) { log = log.WithField("reduced minutes", it.when.Sub(when).Round(time.Second).Minutes()) } else if later && !when.Equal(it.when) { log = log.WithField("delayed minutes", when.Sub(it.when).Round(time.Second).Minutes()) } else { return nil } it.when = when heap.Fix(&q.queue, it.index) log.Info("Fixed names") return nil } // Current status for each item in the queue. func (q *Queue) Current() map[string]time.Time { currently := make(map[string]time.Time, len(q.queue)) q.lock.RLock() defer q.lock.RUnlock() for _, item := range q.queue { currently[item.name] = item.when } return currently } // Status of the queue: depth, next item and when the next item is ready. func (q *Queue) Status() (int, *string, time.Time) { q.lock.RLock() defer q.lock.RUnlock() var who *string var when time.Time if it := q.queue.peek(); it != nil { who = &it.name when = it.when } return len(q.queue), who, when } func (q *Queue) rouse() { select { case q.signal <- struct{}{}: // wake up early default: // not sleeping } } func (q *Queue) sleep(d time.Duration) { log := q.log.WithFields(logrus.Fields{ "seconds": d.Round(100 * time.Millisecond).Seconds(), }) if d > 5*time.Second { log.Info("Sleeping...") } else { log.Debug("Sleeping...") } sleep := time.NewTimer(d) defer sleep.Stop() start := time.Now() select { case <-q.signal: dur := time.Now().Sub(start) log := log.WithField("after", dur.Round(time.Millisecond)) switch { case dur > 10*time.Second: log.Info("Roused") case dur > time.Second: log.Debug("Roused") default: log.Trace("Roused") } case <-sleep.C: } } // Send test groups to receivers until the context expires. // // Pops items off the queue when frequency is zero. // Otherwise reschedules the item after the specified frequency has elapsed. func (q *Queue) Send(ctx context.Context, receivers chan<- string, frequency time.Duration) error { var next func() (*string, time.Time) if frequency == 0 { next = func() (*string, time.Time) { if len(q.queue) == 0 { return nil, time.Time{} } it := heap.Pop(&q.queue).(*item) return &it.name, it.when } } else { next = func() (*string, time.Time) { it := q.queue.peek() if it == nil { return nil, time.Time{} } when := it.when it.when = time.Now().Add(frequency) heap.Fix(&q.queue, it.index) return &it.name, when } } for { if err := ctx.Err(); err != nil { return err } q.lock.Lock() who, when := next() q.lock.Unlock() if who == nil { if frequency == 0 { return nil } q.sleep(time.Second) continue } if dur := time.Until(when); dur > 0 { q.sleep(dur) } select { case receivers <- *who: case <-ctx.Done(): return ctx.Err() } } } type priorityQueue []*item func (pq priorityQueue) Len() int { return len(pq) } func (pq priorityQueue) Less(i, j int) bool { return pq[i].when.Before(pq[j].when) } func (pq priorityQueue) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] pq[i].index = i pq[j].index = j } func (pq *priorityQueue) Push(something interface{}) { it := something.(*item) it.index = len(*pq) *pq = append(*pq, it) } func (pq *priorityQueue) Pop() interface{} { old := *pq n := len(old) it := old[n-1] it.index = -1 old[n-1] = nil *pq = old[0 : n-1] return it } func (pq priorityQueue) peek() *item { n := len(pq) if n == 0 { return nil } return pq[0] } type item struct { name string when time.Time index int }