in substrate/pkg/controller/substrate/controller.go [74:103]
func (c *Controller) Reconcile(ctx context.Context, substrate *v1alpha1.Substrate) error {
ctx, cancel := context.WithCancel(ctx)
var errs = make([]error, len(c.Resources))
workqueue.ParallelizeUntil(ctx, len(c.Resources), len(c.Resources), func(i int) {
for {
resource := c.Resources[i]
c.RLock()
mutable := substrate.DeepCopy()
c.RUnlock()
f := resource.Create
if substrate.DeletionTimestamp != nil {
f = resource.Delete
}
result, err := f(ctx, mutable)
if err != nil {
errs[i] = fmt.Errorf("reconciling %s, %w", reflect.ValueOf(resource).Elem().Type(), err)
cancel()
return
}
c.Lock()
runtime.Must(mergo.Merge(substrate, mutable))
c.Unlock()
if !result.Requeue && result.RequeueAfter == 0 {
return
}
time.Sleep(result.RequeueAfter + time.Second*1)
}
})
return multierr.Combine(errs...)
}