func tabUpdatePool()

in pkg/summarizer/summary.go [632:690]


func tabUpdatePool(poolCtx context.Context, log *logrus.Entry, concurrency int, features FeatureFlags) *tabUpdater {
	type request struct {
		ctx   context.Context
		tab   *configpb.DashboardTab
		group *configpb.TestGroup
		read  gridReader
		sum   *summarypb.DashboardTabSummary
		err   error
		wg    sync.WaitGroup
	}

	ch := make(chan *request, concurrency)

	var wg sync.WaitGroup
	wg.Add(concurrency)
	log = log.WithField("concurrency", concurrency)
	log.Info("Starting up worker pool")

	for i := 0; i < concurrency; i++ {
		go func() {
			defer wg.Done()
			for req := range ch {
				req.sum, req.err = updateTab(req.ctx, req.tab, req.group, req.read, features)
				req.wg.Done()
			}
		}()
	}

	go func() {
		<-poolCtx.Done()
		log.Info("Shutting down worker pool")
		close(ch)
		wg.Wait()
		log.Info("Worker pool stopped")
	}()

	updateTabViaPool := func(ctx context.Context, tab *configpb.DashboardTab, group *configpb.TestGroup, groupReader gridReader) func() (*summarypb.DashboardTabSummary, error) {
		req := &request{
			ctx:   ctx,
			tab:   tab,
			group: group,
			read:  groupReader,
		}
		req.wg.Add(1)
		select {
		case <-ctx.Done():
			return func() (*summarypb.DashboardTabSummary, error) { return nil, ctx.Err() }
		case ch <- req:
			return func() (*summarypb.DashboardTabSummary, error) {
				req.wg.Wait()
				return req.sum, req.err
			}
		}
	}

	return &tabUpdater{
		update: updateTabViaPool,
	}
}