in internal/controllers/scheduling/controller.go [74:169]
func (c *controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := logr.FromContextOrDiscard(ctx)
start := time.Now()
defer func() {
schedulingLatency.Observe(time.Since(start).Seconds())
}()
// Avoid conflict errors by waiting until we see the last dispatched synthesis (or timeout)
if c.lastApplied != nil {
ok, wait, err := c.lastApplied.HasBeenPatched(ctx, c.client, c.cacheGracePeriod)
if err != nil {
logger.Error(err, "checking cache for previous op")
return ctrl.Result{}, err
}
if !ok {
logger.V(1).Info("waiting for cache to reflect previous operation")
return ctrl.Result{RequeueAfter: wait}, nil
}
c.lastApplied = nil
}
synths := &apiv1.SynthesizerList{}
err := c.client.List(ctx, synths)
if err != nil {
logger.Error(err, "failed to list synthesizers")
return ctrl.Result{}, err
}
synthsByName, synthEpoch := indexSynthesizers(synths.Items)
comps := &apiv1.CompositionList{}
err = c.client.List(ctx, comps)
if err != nil {
logger.Error(err, "failed to list compositions")
return ctrl.Result{}, err
}
nextSlot := c.getNextCooldownSlot(comps)
var inFlight int
var op *op
for _, comp := range comps.Items {
comp := comp
if comp.Synthesizing() {
inFlight++
}
if missedReconciliation(&comp, c.watchdogThreshold) {
stuckReconciling.WithLabelValues(comp.Spec.Synthesizer.Name).Inc()
}
synth, ok := synthsByName[comp.Spec.Synthesizer.Name]
if !ok {
continue
}
next := newOp(&synth, &comp, nextSlot)
if next != nil && (op == nil || next.Less(op)) {
op = next
}
}
freeSynthesisSlots.Set(float64(c.concurrencyLimit - inFlight))
if op == nil || inFlight >= c.concurrencyLimit {
return ctrl.Result{}, nil
}
if !op.NotBefore.IsZero() { // the next op isn't ready to be dispathced yet
if wait := time.Until(op.NotBefore); wait > 0 {
return ctrl.Result{RequeueAfter: wait}, nil
}
}
logger = logger.WithValues("compositionName", op.Composition.Name, "compositionNamespace", op.Composition.Namespace, "reason", op.Reason, "synthEpoch", synthEpoch)
// Maintain ordering across synth/composition informers by doing a 2PC on the composition
if op.Reason == synthesizerModifiedOp && setSynthEpochAnnotation(op.Composition, synthEpoch) {
if err := c.client.Update(ctx, op.Composition); err != nil {
logger.Error(err, "updating synthesizer epoch")
return ctrl.Result{}, err
}
logger.V(1).Info("updated global synthesizer epoch")
return ctrl.Result{}, nil
}
if err := c.dispatchOp(ctx, op); err != nil {
if errors.IsInvalid(err) {
logger.Error(err, "conflict while dispatching synthesis")
return ctrl.Result{}, err
}
logger.Error(err, "dispatching synthesis operation")
return ctrl.Result{}, err
}
op.Dispatched = time.Now()
c.lastApplied = op
logger.V(0).Info("dispatched synthesis", "synthesisUUID", op.id)
return ctrl.Result{}, nil
}