in pkg/scheduler/scheduler.go [102:235]
func (s *Scheduler) scheduleOnce(ctx context.Context, worker int) {
// Retrieve the next item (name of a CRP) from the work queue.
//
// Note that this will block if no item is available.
crpName, closed := s.queue.NextClusterResourcePlacementKey()
if closed {
// End the run immediately if the work queue has been closed.
klog.InfoS("Work queue has been closed")
return
}
defer func() {
// Mark the key as done.
//
// Note that this will happen even if an error occurs. Should the key get requeued by Add()
// during the call, it will be added to the queue after this call returns.
s.queue.Done(crpName)
}()
// keep track of the number of active scheduling loop
metrics.SchedulerActiveWorkers.WithLabelValues().Add(1)
defer metrics.SchedulerActiveWorkers.WithLabelValues().Add(-1)
startTime := time.Now()
crpRef := klog.KRef("", string(crpName))
klog.V(2).InfoS("Schedule once started", "clusterResourcePlacement", crpRef, "worker", worker)
defer func() {
// Note that the time spent on pulling keys from the work queue (and the time spent on waiting
// for a key to arrive) is not counted here, as we cannot reliably distinguish between
// system processing latencies and actual duration of cluster resource placement absence.
latency := time.Since(startTime).Milliseconds()
klog.V(2).InfoS("Schedule once completed", "clusterResourcePlacement", crpRef, "latency", latency, "worker", worker)
}()
// Retrieve the CRP.
crp := &fleetv1beta1.ClusterResourcePlacement{}
crpKey := types.NamespacedName{Name: string(crpName)}
if err := s.client.Get(ctx, crpKey, crp); err != nil {
// Wrap the error for metrics; this method does not return an error.
klog.ErrorS(controller.NewAPIServerError(true, err), "Failed to get cluster resource placement", "clusterResourcePlacement", crpRef)
if errors.IsNotFound(err) {
// The CRP has been gone before the scheduler gets a chance to
// process it; normally this would not happen as sources would not enqueue any CRP that
// has been marked for deletion but does not have the scheduler cleanup finalizer to
// the work queue. Such CRPs needs no further processing any way though, as the absence
// of the cleanup finalizer implies that bindings derived from the CRP are no longer present.
return
}
// Requeue for later processing.
s.queue.AddRateLimited(crpName)
return
}
// Check if the CRP has been marked for deletion, and if it has the scheduler cleanup finalizer.
if crp.DeletionTimestamp != nil {
if controllerutil.ContainsFinalizer(crp, fleetv1beta1.SchedulerCRPCleanupFinalizer) {
if err := s.cleanUpAllBindingsFor(ctx, crp); err != nil {
klog.ErrorS(err, "Failed to clean up all bindings for cluster resource placement", "clusterResourcePlacement", crpRef)
// Requeue for later processing.
s.queue.AddRateLimited(crpName)
return
}
}
// The CRP has been marked for deletion but no longer has the scheduler cleanup finalizer; no
// additional handling is needed.
// Untrack the key from the rate limiter.
s.queue.Forget(crpName)
return
}
// The CRP has not been marked for deletion; run the scheduling cycle for it.
// Verify that it has an active policy snapshot.
latestPolicySnapshot, err := s.lookupLatestPolicySnapshot(ctx, crp)
if err != nil {
klog.ErrorS(err, "Failed to lookup latest policy snapshot", "clusterResourcePlacement", crpRef)
// No requeue is needed; the scheduler will be triggered again when an active policy
// snapshot is created.
// Untrack the key for quicker reprocessing.
s.queue.Forget(crpName)
return
}
// Add the scheduler cleanup finalizer to the CRP (if it does not have one yet).
if err := s.addSchedulerCleanUpFinalizer(ctx, crp); err != nil {
klog.ErrorS(err, "Failed to add scheduler cleanup finalizer", "clusterResourcePlacement", crpRef)
// Requeue for later processing.
s.queue.AddRateLimited(crpName)
return
}
// Run the scheduling cycle.
//
// Note that the scheduler will enter this cycle as long as the CRP is active and an active
// policy snapshot has been produced.
cycleStartTime := time.Now()
res, err := s.framework.RunSchedulingCycleFor(ctx, crp.Name, latestPolicySnapshot)
if err != nil {
klog.ErrorS(err, "Failed to run scheduling cycle", "clusterResourcePlacement", crpRef)
// Requeue for later processing.
s.queue.AddRateLimited(crpName)
observeSchedulingCycleMetrics(cycleStartTime, true, false)
return
}
// Requeue if the scheduling cycle suggests so.
if res.Requeue {
if res.RequeueAfter > 0 {
s.queue.AddAfter(crpName, res.RequeueAfter)
observeSchedulingCycleMetrics(cycleStartTime, false, true)
return
}
// Untrack the key from the rate limiter.
s.queue.Forget(crpName)
// Requeue for later processing.
//
// Note that the key is added directly to the queue without having to wait for any rate limiter's
// approval. This is necessary as requeues, requested by the scheduler, occur when the scheduler
// is certain that more scheduling work needs to be done but it cannot be completed in
// one cycle (e.g., a plugin sets up a per-cycle batch limit, and consequently the scheduler must
// finish the scheduling in multiple cycles); in such cases, rate limiter should not add
// any delay to the requeues.
s.queue.Add(crpName)
observeSchedulingCycleMetrics(cycleStartTime, false, true)
} else {
// no more failure, the following queue don't need to be rate limited
s.queue.Forget(crpName)
observeSchedulingCycleMetrics(cycleStartTime, false, false)
}
}