func()

in common/controllers/cronanything_controller.go [119:352]


func (r *ReconcileCronAnything) Reconcile(_ context.Context, request reconcile.Request) (reconcile.Result, error) {
	log := r.Log.WithValues("cronanything-controller", request.NamespacedName)
	instance, err := r.cronanythingControl.Get(request.NamespacedName)
	if err != nil {
		if errors.IsNotFound(err) {
			return reconcile.Result{}, nil
		}
		return reconcile.Result{}, err
	}

	canonicalName := fmt.Sprintf("%s/%s", instance.GetNamespace(), instance.GetName())
	now := r.currentTime()

	if instance.GetDeletionTimestamp() != nil {
		log.Info("Not creating resource because it is being deleted", "caName", canonicalName)
		return reconcile.Result{}, nil
	}

	crt, err := templateToUnstructured(instance)
	if err != nil {
		return reconcile.Result{}, err
	}

	cgvr, found := r.resourceResolver.Resolve(crt.GroupVersionKind())
	if !found {
		return reconcile.Result{}, fmt.Errorf("unable to resolve child resource for %s", canonicalName)
	}

	// Look up all child resources of the cronanything resource. These are needed to make sure the
	// controller adheres to the concurrency policy, to clean up completed resources as specified
	// in the historyLimit and control the total number of child resources as specified in totalResourceLimit
	childResources, err := r.getChildResources(instance, crt, cgvr)
	if err != nil {
		return reconcile.Result{}, err
	}

	// Cleanup finished resources. Do this before checking the total resource usage to make sure
	// finished resources that should be deleted does not count against the total limit.
	childResources = r.cleanupHistory(instance, childResources, cgvr, now)

	// Just return without doing any work if it is suspended.
	if instance.CronAnythingSpec().Suspend != nil && *instance.CronAnythingSpec().Suspend {
		return reconcile.Result{}, nil
	}

	unmetScheduleTimes, nextScheduleTime, err := getScheduleTimes(instance, now.Add(1*time.Second))
	if err != nil {
		return reconcile.Result{}, err
	}

	if len(unmetScheduleTimes) == 0 {
		log.Info("No unmet trigger times", "caName", canonicalName)
		return r.updateTriggerTimes(canonicalName, nextScheduleTime), nil
	}

	scheduleTime := unmetScheduleTimes[len(unmetScheduleTimes)-1]
	droppedSchedules := unmetScheduleTimes[:len(unmetScheduleTimes)-1]

	if len(droppedSchedules) > 0 {
		log.Info("Dropping unmet triggers", "caName", canonicalName, "count", len(droppedSchedules))
		latestDropped := droppedSchedules[len(droppedSchedules)-1]

		historyRecord := cronanything.TriggerHistoryRecord{
			ScheduleTime:      metav1.NewTime(latestDropped),
			CreationTimestamp: metav1.NewTime(now),
		}

		if len(droppedSchedules) == 1 && instance.CronAnythingStatus().PendingTrigger != nil && instance.CronAnythingStatus().PendingTrigger.ScheduleTime.Equal(getMetaTimePointer(latestDropped)) {
			// If we get here it means we have one dropped trigger and also we have a trigger that
			// we haven't been able to complete. Use the info from the pending trigger to report this
			// in the trigger history.
			historyRecord.Result = instance.CronAnythingStatus().PendingTrigger.Result
		} else {
			historyRecord.Result = cronanything.TriggerResultMissed
		}

		err := r.updateCronAnythingStatus(instance.GetName(), instance.GetNamespace(), func(freshStatus *cronanything.CronAnythingStatus) {
			updateLastScheduleTime(freshStatus, latestDropped)
			freshStatus.PendingTrigger = nil
			addToTriggerHistory(freshStatus, historyRecord)
		})
		if err != nil {
			// Since we haven't done anything yet, we can safely just return the error here and let the controller
			// retry.
			return reconcile.Result{}, err
		}

	}

	log.Info("Unmet trigger time", "caName", canonicalName, "scheduledTime", scheduleTime.Format(time.RFC3339))

	if instance.CronAnythingSpec().TriggerDeadlineSeconds != nil {
		triggerDeadline := time.Duration(*instance.CronAnythingSpec().TriggerDeadlineSeconds)
		if scheduleTime.Add(triggerDeadline * time.Second).Before(now) {
			log.Info("Trigger deadline exceeded", "caName", canonicalName, "triggerTime", canonicalName, "scheduleTime", scheduleTime.Format(time.RFC3339))
			err = r.updateCronAnythingStatus(instance.GetName(), instance.GetNamespace(), func(freshStatus *cronanything.CronAnythingStatus) {
				updateLastScheduleTime(freshStatus, scheduleTime)

				historyRecord := cronanything.TriggerHistoryRecord{
					ScheduleTime:      metav1.NewTime(scheduleTime),
					CreationTimestamp: metav1.NewTime(now),
				}

				if freshStatus.PendingTrigger != nil && freshStatus.PendingTrigger.ScheduleTime.Equal(getMetaTimePointer(scheduleTime)) {
					historyRecord.Result = freshStatus.PendingTrigger.Result
				} else {
					historyRecord.Result = cronanything.TriggerResultDeadlineExceeded
				}
				addToTriggerHistory(freshStatus, historyRecord)

				freshStatus.PendingTrigger = nil
			})
			if err != nil {
				return reconcile.Result{}, err
			}
			return r.updateTriggerTimes(canonicalName, nextScheduleTime), nil
		}
	}

	activeChildResources := findActiveChildResources(instance, childResources, log)
	log.Info("Found active child resources", "caName", canonicalName, "numActiveChildResources", len(activeChildResources))
	if len(activeChildResources) > 0 {
		switch instance.CronAnythingSpec().ConcurrencyPolicy {
		case cronanything.ForbidConcurrent:
			log.Info("Found existing active resource, so no new scheduled due to ForbidConcurrent policy", "caName", canonicalName)
			err = r.updateCronAnythingStatus(instance.GetName(), instance.GetNamespace(), func(freshStatus *cronanything.CronAnythingStatus) {
				updateLastScheduleTime(freshStatus, scheduleTime)

				freshStatus.PendingTrigger = nil

				addToTriggerHistory(freshStatus, cronanything.TriggerHistoryRecord{
					ScheduleTime:      metav1.NewTime(scheduleTime),
					CreationTimestamp: metav1.NewTime(now),
					Result:            cronanything.TriggerResultForbidConcurrent,
				})
			})
			if err != nil {
				return reconcile.Result{}, err
			}
			return r.updateTriggerTimes(canonicalName, nextScheduleTime), nil
		case cronanything.ReplaceConcurrent:
			// All currently active resources should be replaced. We do this by deleting them.
			for _, activeResource := range activeChildResources {
				// No need to delete resource if it already in the process of being deleted.
				if activeResource.GetDeletionTimestamp() != nil {
					continue
				}
				log.Info("Deleting resource due to ReplaceConcurrent policy", "caName", canonicalName, "activeResource", activeResource.GetName())
				err = r.resourceControl.Delete(cgvr, activeResource.GetNamespace(), activeResource.GetName())
				if err != nil {
					r.eventRecorder.Eventf(instance, v1.EventTypeWarning, "FailedDeleteForReplace", "Error deleting resource %s: %v", activeResource.GetName(), err)
					return reconcile.Result{}, err
				}
				r.eventRecorder.Eventf(instance, v1.EventTypeNormal, "DeletedForReplace", "Resource %s deleted due to Replace policy", activeResource.GetName())
			}
			// Returning here. Next iteration the resources will (hopefully) have been deleted and a new object can
			// be created. If the deletion is not completed, it will have to wait longer.
			return reconcile.Result{
				RequeueAfter: 1 * time.Second,
			}, nil
		}
	}

	if instance.CronAnythingSpec().TotalResourceLimit != nil && int32(len(childResources)) >= *instance.CronAnythingSpec().TotalResourceLimit {
		log.Info("Resource limit info", "caName", canonicalName, "limit", *instance.CronAnythingSpec().TotalResourceLimit, "numChildResources", len(childResources))
		r.eventRecorder.Eventf(instance, v1.EventTypeWarning, "ResourceLimitReached", "Limit of %d resources has been reached", len(childResources))
		log.Info("Resource limit has been reached. No new resource can be created", "caName", canonicalName)
		err = r.updateCronAnythingStatus(instance.GetName(), instance.GetNamespace(), func(freshStatus *cronanything.CronAnythingStatus) {
			updateLastScheduleTime(freshStatus, scheduleTime)

			freshStatus.PendingTrigger = nil

			addToTriggerHistory(freshStatus, cronanything.TriggerHistoryRecord{
				ScheduleTime:      metav1.NewTime(scheduleTime),
				CreationTimestamp: metav1.NewTime(now),
				Result:            cronanything.TriggerResultResourceLimitReached,
			})
		})

		if err != nil {
			return reconcile.Result{}, err
		}
		return r.updateTriggerTimes(canonicalName, nextScheduleTime), nil
	}

	name := getResourceName(instance, scheduleTime)

	crt.SetName(name)
	labels := crt.GetLabels()
	if labels == nil {
		labels = make(map[string]string)
	}
	labels[cronanything.CronAnythingCreatedByLabel] = instance.GetName()
	labels[cronanything.CronAnythingScheduleTimeLabel] = strconv.FormatInt(scheduleTime.Unix(), 10)
	crt.SetLabels(labels)
	if instance.CronAnythingSpec().CascadeDelete != nil && *instance.CronAnythingSpec().CascadeDelete {
		crt.SetOwnerReferences([]metav1.OwnerReference{*metav1.NewControllerRef(instance, controllerKind)})
	}

	err = r.resourceControl.Create(cgvr, instance.GetNamespace(), crt)
	if err != nil {
		statusErr := r.updateCronAnythingStatus(instance.GetName(), instance.GetNamespace(), func(freshStatus *cronanything.CronAnythingStatus) {
			freshStatus.PendingTrigger = &cronanything.PendingTrigger{
				ScheduleTime: metav1.NewTime(scheduleTime),
				Result:       cronanything.TriggerResultCreateFailed,
			}

		})
		if statusErr != nil {
			log.Error(statusErr, "Failed to update status for CronAnything after failed create attempt", "caName", canonicalName)
		}
		r.eventRecorder.Eventf(instance, v1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
		return reconcile.Result{}, err
	}
	log.Info("Created new resource", "caName", canonicalName, "childName", name)
	r.eventRecorder.Eventf(instance, v1.EventTypeNormal, "SuccessfulCreate", "Created resource %s", name)

	err = r.updateCronAnythingStatus(instance.GetName(), instance.GetNamespace(), func(freshStatus *cronanything.CronAnythingStatus) {
		updateLastScheduleTime(freshStatus, scheduleTime)

		freshStatus.PendingTrigger = nil

		addToTriggerHistory(freshStatus, cronanything.TriggerHistoryRecord{
			ScheduleTime:      metav1.NewTime(scheduleTime),
			CreationTimestamp: metav1.NewTime(now),
			Result:            cronanything.TriggerResultCreateSucceeded,
		})
	})
	if err != nil {
		return reconcile.Result{}, err
	}

	return r.updateTriggerTimes(canonicalName, nextScheduleTime), nil
}