in pkg/summarizer/summary.go [632:690]
func tabUpdatePool(poolCtx context.Context, log *logrus.Entry, concurrency int, features FeatureFlags) *tabUpdater {
type request struct {
ctx context.Context
tab *configpb.DashboardTab
group *configpb.TestGroup
read gridReader
sum *summarypb.DashboardTabSummary
err error
wg sync.WaitGroup
}
ch := make(chan *request, concurrency)
var wg sync.WaitGroup
wg.Add(concurrency)
log = log.WithField("concurrency", concurrency)
log.Info("Starting up worker pool")
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
for req := range ch {
req.sum, req.err = updateTab(req.ctx, req.tab, req.group, req.read, features)
req.wg.Done()
}
}()
}
go func() {
<-poolCtx.Done()
log.Info("Shutting down worker pool")
close(ch)
wg.Wait()
log.Info("Worker pool stopped")
}()
updateTabViaPool := func(ctx context.Context, tab *configpb.DashboardTab, group *configpb.TestGroup, groupReader gridReader) func() (*summarypb.DashboardTabSummary, error) {
req := &request{
ctx: ctx,
tab: tab,
group: group,
read: groupReader,
}
req.wg.Add(1)
select {
case <-ctx.Done():
return func() (*summarypb.DashboardTabSummary, error) { return nil, ctx.Err() }
case ch <- req:
return func() (*summarypb.DashboardTabSummary, error) {
req.wg.Wait()
return req.sum, req.err
}
}
}
return &tabUpdater{
update: updateTabViaPool,
}
}