in pkg/cloud/rgraph/algo/trclosure/transitive_closure.go [89:147]
func doInternal(
ctx context.Context,
cl cloud.Cloud,
gr *rgraph.Builder,
pq *algo.ParallelQueue[work],
opts ...Option,
) error {
config := makeConfig(opts...)
for _, nb := range gr.All() {
if ok := pq.Add(work{b: nb}); !ok {
return fmt.Errorf("parallel queue is done")
}
}
// graphLock is held when updating gr (rgraph.Builder).
//
// Invariant: We traverse and add each Node exactly once. We maintain this
// by holding graphLock while checking and potentially adding the newly
// traversed Nodes to the graph.
var graphLock sync.Mutex
fn := func(ctx context.Context, w work) error {
outRefs, err := syncNode(ctx, cl, config, w.b)
if err != nil {
return err
}
for _, ref := range outRefs {
graphLock.Lock()
if gr.Get(ref.To) != nil {
// We have already fetched the Node, don't need to add to the
// graph and the work queue.
klog.V(2).Infof("ref.To %+v is already in the graph, ignoring", ref)
graphLock.Unlock()
continue
}
toNode, err := all.NewBuilderByID(ref.To)
if err != nil {
graphLock.Unlock()
return makeErr("%w", err)
}
// Add the untraversed node to the graph.
klog.V(2).Infof("ref.To %+v has not been traversed, adding to graph", ref)
gr.Add(toNode)
graphLock.Unlock()
if ok := pq.Add(work{b: toNode}); !ok {
return fmt.Errorf("parallel queue is done")
}
}
return nil
}
return pq.Run(ctx, fn)
}