in pkg/cloud/rgraph/algo/parallel_queue.go [152:199]
func (q *ParallelQueue[T]) Run(ctx context.Context, op func(context.Context, T) error) error {
q.lock.Lock()
if q.state != stateNotStarted {
q.lock.Unlock()
return fmt.Errorf("Run() can only be called once (state=%d)", q.state)
}
q.state = stateRunning
q.lock.Unlock()
for {
q.lock.Lock()
klog.V(4).Infof("Run loop: pending: %d active: %d", len(q.pending), q.active)
if len(q.pending) == 0 && q.active == 0 {
q.state = stateDone
q.lock.Unlock()
return nil
}
q.launch(ctx, op)
q.lock.Unlock()
select {
case <-ctx.Done():
q.lock.Lock()
q.state = stateDone
klog.V(2).Infof("Context is Done, exiting early (pending: %d active: %d): %v", len(q.pending), q.active, ctx.Err())
q.lock.Unlock()
return ctx.Err()
case ri := <-q.done:
q.c.tracer(ri)
q.lock.Lock()
q.active--
if ri.Err != nil {
q.state = stateDone
klog.V(2).Infof("Task error, exiting early (pending: %d active: %d): %v", len(q.pending), q.active, ri.Err)
q.lock.Unlock()
return ri.Err
}
q.lock.Unlock()
case <-q.in:
klog.V(4).Info("<-q.in")
// Wake up from sleep to (maybe) launch new items.
}
}
}