func doInternal()

in pkg/cloud/rgraph/algo/trclosure/transitive_closure.go [89:147]


func doInternal(
	ctx context.Context,
	cl cloud.Cloud,
	gr *rgraph.Builder,
	pq *algo.ParallelQueue[work],
	opts ...Option,
) error {
	config := makeConfig(opts...)

	for _, nb := range gr.All() {
		if ok := pq.Add(work{b: nb}); !ok {
			return fmt.Errorf("parallel queue is done")
		}
	}

	// graphLock is held when updating gr (rgraph.Builder).
	//
	// Invariant: We traverse and add each Node exactly once. We maintain this
	// by holding graphLock while checking and potentially adding the newly
	// traversed Nodes to the graph.
	var graphLock sync.Mutex

	fn := func(ctx context.Context, w work) error {
		outRefs, err := syncNode(ctx, cl, config, w.b)
		if err != nil {
			return err
		}

		for _, ref := range outRefs {
			graphLock.Lock()

			if gr.Get(ref.To) != nil {
				// We have already fetched the Node, don't need to add to the
				// graph and the work queue.
				klog.V(2).Infof("ref.To %+v is already in the graph, ignoring", ref)
				graphLock.Unlock()
				continue
			}
			toNode, err := all.NewBuilderByID(ref.To)
			if err != nil {
				graphLock.Unlock()
				return makeErr("%w", err)
			}

			// Add the untraversed node to the graph.
			klog.V(2).Infof("ref.To %+v has not been traversed, adding to graph", ref)
			gr.Add(toNode)
			graphLock.Unlock()

			if ok := pq.Add(work{b: toNode}); !ok {
				return fmt.Errorf("parallel queue is done")
			}
		}

		return nil
	}

	return pq.Run(ctx, fn)
}