in pkg/updater/read.go [78:139]
func resultReaderPool(poolCtx context.Context, log *logrus.Entry, concurrency int) *resultReader {
type request struct {
ctx context.Context
client gcs.Downloader
build gcs.Build
stop time.Time
res *gcsResult
err error
wg sync.WaitGroup
}
ch := make(chan *request, concurrency)
var wg sync.WaitGroup
wg.Add(concurrency)
log = log.WithField("concurrency", concurrency)
log.Info("Starting up result reader pool")
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
for req := range ch {
req.res, req.err = readResult(req.ctx, req.client, req.build, req.stop)
req.wg.Done()
}
}()
}
go func() {
<-poolCtx.Done()
log.Info("Shutting down result reader pool")
close(ch)
wg.Wait()
log.Info("Result reader pool stopped")
}()
readResultViaPool := func(ctx context.Context, client gcs.Downloader, build gcs.Build, stop time.Time) func() (*gcsResult, error) {
req := &request{
ctx: ctx,
client: client,
build: build,
stop: stop,
}
req.wg.Add(1)
select {
case <-ctx.Done():
return func() (*gcsResult, error) { return nil, ctx.Err() }
case ch <- req: // wait for request to get onto the queue
return func() (*gcsResult, error) {
req.wg.Wait()
return req.res, req.err
}
}
}
return &resultReader{
lock: &sync.Mutex{},
read: readResultViaPool,
}
}