config/queue.go (105 lines of code) (raw):
/*
Copyright 2021 The TestGrid Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"context"
"sync"
"time"
"bitbucket.org/creachadair/stringset"
configpb "github.com/GoogleCloudPlatform/testgrid/pb/config"
"github.com/GoogleCloudPlatform/testgrid/util/queue"
"github.com/sirupsen/logrus"
)
// DashboardQueue sends dashboard names at a specific frequency.
type DashboardQueue struct {
queue.Queue
dashboards map[string]*configpb.Dashboard
groups map[string]*stringset.Set
lock sync.RWMutex
}
// Init (or reinit) the queue with the specified configuration.
func (q *DashboardQueue) Init(log logrus.FieldLogger, dashboards []*configpb.Dashboard, when time.Time) {
n := len(dashboards)
names := make([]string, n)
namedDashboards := make(map[string]*configpb.Dashboard, n)
groups := make(map[string]*stringset.Set, n)
for i, d := range dashboards {
name := d.Name
names[i] = name
for _, tab := range d.DashboardTab {
if groups[tab.TestGroupName] == nil {
ns := stringset.New()
groups[tab.TestGroupName] = &ns
}
groups[tab.TestGroupName].Add(name)
}
}
q.lock.Lock()
q.Queue.Init(log, names, when)
q.dashboards = namedDashboards
q.groups = groups
q.lock.Unlock()
}
// FixTestGroups will fix all the dashboards associated with the groups.
func (q *DashboardQueue) FixTestGroups(when time.Time, later bool, groups ...string) error {
q.lock.RLock()
defer q.lock.RUnlock()
dashboards := make(map[string]time.Time, len(groups))
for _, groupName := range groups {
dashes := q.groups[groupName]
if dashes == nil {
continue
}
for dashName := range *dashes {
dashboards[dashName] = when
}
}
return q.FixAll(dashboards, later)
}
// TestGroupQueue can send test groups to receivers at a specific frequency.
//
// Also contains the ability to modify the next time to send groups.
// First call must be to Init().
// Exported methods are safe to call concurrently.
type TestGroupQueue struct {
queue.Queue
groups map[string]*configpb.TestGroup
lock sync.RWMutex
}
// Init (or reinit) the queue with the specified groups, which should be updated at frequency.
func (q *TestGroupQueue) Init(log logrus.FieldLogger, testGroups []*configpb.TestGroup, when time.Time) {
n := len(testGroups)
groups := make(map[string]*configpb.TestGroup, n)
names := make([]string, n)
for i, tg := range testGroups {
name := tg.Name
names[i] = name
groups[name] = tg
}
q.lock.Lock()
q.Queue.Init(log, names, when)
q.groups = groups
q.lock.Unlock()
}
// Status of the queue: depth, next item and when the next item is ready.
func (q *TestGroupQueue) Status() (int, *configpb.TestGroup, time.Time) {
q.lock.RLock()
defer q.lock.RUnlock()
var tg *configpb.TestGroup
var when time.Time
n, who, when := q.Queue.Status()
if who != nil {
tg = q.groups[*who]
}
return n, tg, when
}
// Send test groups to receivers until the context expires.
//
// Pops items off the queue when frequency is zero.
// Otherwise reschedules the item after the specified frequency has elapsed.
func (q *TestGroupQueue) Send(ctx context.Context, receivers chan<- *configpb.TestGroup, frequency time.Duration) error {
ch := make(chan string)
var err error
go func() {
err = q.Queue.Send(ctx, ch, frequency)
close(ch)
}()
for who := range ch {
q.lock.RLock()
tg := q.groups[who]
q.lock.RUnlock()
if tg == nil {
continue
}
select {
case <-ctx.Done():
return ctx.Err()
case receivers <- tg:
}
}
return err
}