func resultReaderPool()

in pkg/updater/read.go [78:139]


func resultReaderPool(poolCtx context.Context, log *logrus.Entry, concurrency int) *resultReader {

	type request struct {
		ctx    context.Context
		client gcs.Downloader
		build  gcs.Build
		stop   time.Time
		res    *gcsResult
		err    error
		wg     sync.WaitGroup
	}

	ch := make(chan *request, concurrency)

	var wg sync.WaitGroup
	wg.Add(concurrency)
	log = log.WithField("concurrency", concurrency)
	log.Info("Starting up result reader pool")

	for i := 0; i < concurrency; i++ {
		go func() {
			defer wg.Done()
			for req := range ch {
				req.res, req.err = readResult(req.ctx, req.client, req.build, req.stop)
				req.wg.Done()
			}
		}()
	}

	go func() {
		<-poolCtx.Done()
		log.Info("Shutting down result reader pool")
		close(ch)
		wg.Wait()
		log.Info("Result reader pool stopped")
	}()

	readResultViaPool := func(ctx context.Context, client gcs.Downloader, build gcs.Build, stop time.Time) func() (*gcsResult, error) {

		req := &request{
			ctx:    ctx,
			client: client,
			build:  build,
			stop:   stop,
		}
		req.wg.Add(1)
		select {
		case <-ctx.Done():
			return func() (*gcsResult, error) { return nil, ctx.Err() }
		case ch <- req: // wait for request to get onto the queue
			return func() (*gcsResult, error) {
				req.wg.Wait()
				return req.res, req.err
			}
		}
	}

	return &resultReader{
		lock: &sync.Mutex{},
		read: readResultViaPool,
	}
}