func()

in pkg/cloud/rgraph/algo/parallel_queue.go [152:199]


func (q *ParallelQueue[T]) Run(ctx context.Context, op func(context.Context, T) error) error {
	q.lock.Lock()
	if q.state != stateNotStarted {
		q.lock.Unlock()
		return fmt.Errorf("Run() can only be called once (state=%d)", q.state)
	}
	q.state = stateRunning
	q.lock.Unlock()

	for {
		q.lock.Lock()

		klog.V(4).Infof("Run loop: pending: %d active: %d", len(q.pending), q.active)
		if len(q.pending) == 0 && q.active == 0 {
			q.state = stateDone
			q.lock.Unlock()
			return nil
		}
		q.launch(ctx, op)

		q.lock.Unlock()

		select {
		case <-ctx.Done():
			q.lock.Lock()
			q.state = stateDone
			klog.V(2).Infof("Context is Done, exiting early (pending: %d active: %d): %v", len(q.pending), q.active, ctx.Err())
			q.lock.Unlock()
			return ctx.Err()
		case ri := <-q.done:
			q.c.tracer(ri)
			q.lock.Lock()
			q.active--

			if ri.Err != nil {
				q.state = stateDone
				klog.V(2).Infof("Task error, exiting early (pending: %d active: %d): %v", len(q.pending), q.active, ri.Err)
				q.lock.Unlock()
				return ri.Err
			}

			q.lock.Unlock()
		case <-q.in:
			klog.V(4).Info("<-q.in")
			// Wake up from sleep to (maybe) launch new items.
		}
	}
}