func()

in pkg/scheduler/scheduler.go [102:235]


func (s *Scheduler) scheduleOnce(ctx context.Context, worker int) {
	// Retrieve the next item (name of a CRP) from the work queue.
	//
	// Note that this will block if no item is available.
	crpName, closed := s.queue.NextClusterResourcePlacementKey()
	if closed {
		// End the run immediately if the work queue has been closed.
		klog.InfoS("Work queue has been closed")
		return
	}

	defer func() {
		// Mark the key as done.
		//
		// Note that this will happen even if an error occurs. Should the key get requeued by Add()
		// during the call, it will be added to the queue after this call returns.
		s.queue.Done(crpName)
	}()

	// keep track of the number of active scheduling loop
	metrics.SchedulerActiveWorkers.WithLabelValues().Add(1)
	defer metrics.SchedulerActiveWorkers.WithLabelValues().Add(-1)

	startTime := time.Now()
	crpRef := klog.KRef("", string(crpName))
	klog.V(2).InfoS("Schedule once started", "clusterResourcePlacement", crpRef, "worker", worker)
	defer func() {
		// Note that the time spent on pulling keys from the work queue (and the time spent on waiting
		// for a key to arrive) is not counted here, as we cannot reliably distinguish between
		// system processing latencies and actual duration of cluster resource placement absence.
		latency := time.Since(startTime).Milliseconds()
		klog.V(2).InfoS("Schedule once completed", "clusterResourcePlacement", crpRef, "latency", latency, "worker", worker)
	}()

	// Retrieve the CRP.
	crp := &fleetv1beta1.ClusterResourcePlacement{}
	crpKey := types.NamespacedName{Name: string(crpName)}
	if err := s.client.Get(ctx, crpKey, crp); err != nil {
		// Wrap the error for metrics; this method does not return an error.
		klog.ErrorS(controller.NewAPIServerError(true, err), "Failed to get cluster resource placement", "clusterResourcePlacement", crpRef)

		if errors.IsNotFound(err) {
			// The CRP has been gone before the scheduler gets a chance to
			// process it; normally this would not happen as sources would not enqueue any CRP that
			// has been marked for deletion but does not have the scheduler cleanup finalizer to
			// the work queue. Such CRPs needs no further processing any way though, as the absence
			// of the cleanup finalizer implies that bindings derived from the CRP are no longer present.
			return
		}

		// Requeue for later processing.
		s.queue.AddRateLimited(crpName)
		return
	}

	// Check if the CRP has been marked for deletion, and if it has the scheduler cleanup finalizer.
	if crp.DeletionTimestamp != nil {
		if controllerutil.ContainsFinalizer(crp, fleetv1beta1.SchedulerCRPCleanupFinalizer) {
			if err := s.cleanUpAllBindingsFor(ctx, crp); err != nil {
				klog.ErrorS(err, "Failed to clean up all bindings for cluster resource placement", "clusterResourcePlacement", crpRef)
				// Requeue for later processing.
				s.queue.AddRateLimited(crpName)
				return
			}
		}
		// The CRP has been marked for deletion but no longer has the scheduler cleanup finalizer; no
		// additional handling is needed.

		// Untrack the key from the rate limiter.
		s.queue.Forget(crpName)
		return
	}

	// The CRP has not been marked for deletion; run the scheduling cycle for it.

	// Verify that it has an active policy snapshot.
	latestPolicySnapshot, err := s.lookupLatestPolicySnapshot(ctx, crp)
	if err != nil {
		klog.ErrorS(err, "Failed to lookup latest policy snapshot", "clusterResourcePlacement", crpRef)
		// No requeue is needed; the scheduler will be triggered again when an active policy
		// snapshot is created.

		// Untrack the key for quicker reprocessing.
		s.queue.Forget(crpName)
		return
	}

	// Add the scheduler cleanup finalizer to the CRP (if it does not have one yet).
	if err := s.addSchedulerCleanUpFinalizer(ctx, crp); err != nil {
		klog.ErrorS(err, "Failed to add scheduler cleanup finalizer", "clusterResourcePlacement", crpRef)
		// Requeue for later processing.
		s.queue.AddRateLimited(crpName)
		return
	}

	// Run the scheduling cycle.
	//
	// Note that the scheduler will enter this cycle as long as the CRP is active and an active
	// policy snapshot has been produced.
	cycleStartTime := time.Now()
	res, err := s.framework.RunSchedulingCycleFor(ctx, crp.Name, latestPolicySnapshot)
	if err != nil {
		klog.ErrorS(err, "Failed to run scheduling cycle", "clusterResourcePlacement", crpRef)
		// Requeue for later processing.
		s.queue.AddRateLimited(crpName)
		observeSchedulingCycleMetrics(cycleStartTime, true, false)
		return
	}

	// Requeue if the scheduling cycle suggests so.
	if res.Requeue {
		if res.RequeueAfter > 0 {
			s.queue.AddAfter(crpName, res.RequeueAfter)
			observeSchedulingCycleMetrics(cycleStartTime, false, true)
			return
		}
		// Untrack the key from the rate limiter.
		s.queue.Forget(crpName)
		// Requeue for later processing.
		//
		// Note that the key is added directly to the queue without having to wait for any rate limiter's
		// approval. This is necessary as requeues, requested by the scheduler, occur when the scheduler
		// is certain that more scheduling work needs to be done but it cannot be completed in
		// one cycle (e.g., a plugin sets up a per-cycle batch limit, and consequently the scheduler must
		// finish the scheduling in multiple cycles); in such cases, rate limiter should not add
		// any delay to the requeues.
		s.queue.Add(crpName)
		observeSchedulingCycleMetrics(cycleStartTime, false, true)
	} else {
		// no more failure, the following queue don't need to be rate limited
		s.queue.Forget(crpName)
		observeSchedulingCycleMetrics(cycleStartTime, false, false)
	}
}