func()

in internal/controllers/scheduling/controller.go [74:169]


func (c *controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	logger := logr.FromContextOrDiscard(ctx)
	start := time.Now()
	defer func() {
		schedulingLatency.Observe(time.Since(start).Seconds())
	}()

	// Avoid conflict errors by waiting until we see the last dispatched synthesis (or timeout)
	if c.lastApplied != nil {
		ok, wait, err := c.lastApplied.HasBeenPatched(ctx, c.client, c.cacheGracePeriod)
		if err != nil {
			logger.Error(err, "checking cache for previous op")
			return ctrl.Result{}, err
		}
		if !ok {
			logger.V(1).Info("waiting for cache to reflect previous operation")
			return ctrl.Result{RequeueAfter: wait}, nil
		}
		c.lastApplied = nil
	}

	synths := &apiv1.SynthesizerList{}
	err := c.client.List(ctx, synths)
	if err != nil {
		logger.Error(err, "failed to list synthesizers")
		return ctrl.Result{}, err
	}
	synthsByName, synthEpoch := indexSynthesizers(synths.Items)

	comps := &apiv1.CompositionList{}
	err = c.client.List(ctx, comps)
	if err != nil {
		logger.Error(err, "failed to list compositions")
		return ctrl.Result{}, err
	}
	nextSlot := c.getNextCooldownSlot(comps)

	var inFlight int
	var op *op
	for _, comp := range comps.Items {
		comp := comp
		if comp.Synthesizing() {
			inFlight++
		}

		if missedReconciliation(&comp, c.watchdogThreshold) {
			stuckReconciling.WithLabelValues(comp.Spec.Synthesizer.Name).Inc()
		}

		synth, ok := synthsByName[comp.Spec.Synthesizer.Name]
		if !ok {
			continue
		}

		next := newOp(&synth, &comp, nextSlot)
		if next != nil && (op == nil || next.Less(op)) {
			op = next
		}
	}
	freeSynthesisSlots.Set(float64(c.concurrencyLimit - inFlight))

	if op == nil || inFlight >= c.concurrencyLimit {
		return ctrl.Result{}, nil
	}
	if !op.NotBefore.IsZero() { // the next op isn't ready to be dispathced yet
		if wait := time.Until(op.NotBefore); wait > 0 {
			return ctrl.Result{RequeueAfter: wait}, nil
		}
	}
	logger = logger.WithValues("compositionName", op.Composition.Name, "compositionNamespace", op.Composition.Namespace, "reason", op.Reason, "synthEpoch", synthEpoch)

	// Maintain ordering across synth/composition informers by doing a 2PC on the composition
	if op.Reason == synthesizerModifiedOp && setSynthEpochAnnotation(op.Composition, synthEpoch) {
		if err := c.client.Update(ctx, op.Composition); err != nil {
			logger.Error(err, "updating synthesizer epoch")
			return ctrl.Result{}, err
		}
		logger.V(1).Info("updated global synthesizer epoch")
		return ctrl.Result{}, nil
	}

	if err := c.dispatchOp(ctx, op); err != nil {
		if errors.IsInvalid(err) {
			logger.Error(err, "conflict while dispatching synthesis")
			return ctrl.Result{}, err
		}
		logger.Error(err, "dispatching synthesis operation")
		return ctrl.Result{}, err
	}

	op.Dispatched = time.Now()
	c.lastApplied = op
	logger.V(0).Info("dispatched synthesis", "synthesisUUID", op.id)

	return ctrl.Result{}, nil
}