in common/controllers/cronanything_controller.go [119:352]
func (r *ReconcileCronAnything) Reconcile(_ context.Context, request reconcile.Request) (reconcile.Result, error) {
log := r.Log.WithValues("cronanything-controller", request.NamespacedName)
instance, err := r.cronanythingControl.Get(request.NamespacedName)
if err != nil {
if errors.IsNotFound(err) {
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
canonicalName := fmt.Sprintf("%s/%s", instance.GetNamespace(), instance.GetName())
now := r.currentTime()
if instance.GetDeletionTimestamp() != nil {
log.Info("Not creating resource because it is being deleted", "caName", canonicalName)
return reconcile.Result{}, nil
}
crt, err := templateToUnstructured(instance)
if err != nil {
return reconcile.Result{}, err
}
cgvr, found := r.resourceResolver.Resolve(crt.GroupVersionKind())
if !found {
return reconcile.Result{}, fmt.Errorf("unable to resolve child resource for %s", canonicalName)
}
// Look up all child resources of the cronanything resource. These are needed to make sure the
// controller adheres to the concurrency policy, to clean up completed resources as specified
// in the historyLimit and control the total number of child resources as specified in totalResourceLimit
childResources, err := r.getChildResources(instance, crt, cgvr)
if err != nil {
return reconcile.Result{}, err
}
// Cleanup finished resources. Do this before checking the total resource usage to make sure
// finished resources that should be deleted does not count against the total limit.
childResources = r.cleanupHistory(instance, childResources, cgvr, now)
// Just return without doing any work if it is suspended.
if instance.CronAnythingSpec().Suspend != nil && *instance.CronAnythingSpec().Suspend {
return reconcile.Result{}, nil
}
unmetScheduleTimes, nextScheduleTime, err := getScheduleTimes(instance, now.Add(1*time.Second))
if err != nil {
return reconcile.Result{}, err
}
if len(unmetScheduleTimes) == 0 {
log.Info("No unmet trigger times", "caName", canonicalName)
return r.updateTriggerTimes(canonicalName, nextScheduleTime), nil
}
scheduleTime := unmetScheduleTimes[len(unmetScheduleTimes)-1]
droppedSchedules := unmetScheduleTimes[:len(unmetScheduleTimes)-1]
if len(droppedSchedules) > 0 {
log.Info("Dropping unmet triggers", "caName", canonicalName, "count", len(droppedSchedules))
latestDropped := droppedSchedules[len(droppedSchedules)-1]
historyRecord := cronanything.TriggerHistoryRecord{
ScheduleTime: metav1.NewTime(latestDropped),
CreationTimestamp: metav1.NewTime(now),
}
if len(droppedSchedules) == 1 && instance.CronAnythingStatus().PendingTrigger != nil && instance.CronAnythingStatus().PendingTrigger.ScheduleTime.Equal(getMetaTimePointer(latestDropped)) {
// If we get here it means we have one dropped trigger and also we have a trigger that
// we haven't been able to complete. Use the info from the pending trigger to report this
// in the trigger history.
historyRecord.Result = instance.CronAnythingStatus().PendingTrigger.Result
} else {
historyRecord.Result = cronanything.TriggerResultMissed
}
err := r.updateCronAnythingStatus(instance.GetName(), instance.GetNamespace(), func(freshStatus *cronanything.CronAnythingStatus) {
updateLastScheduleTime(freshStatus, latestDropped)
freshStatus.PendingTrigger = nil
addToTriggerHistory(freshStatus, historyRecord)
})
if err != nil {
// Since we haven't done anything yet, we can safely just return the error here and let the controller
// retry.
return reconcile.Result{}, err
}
}
log.Info("Unmet trigger time", "caName", canonicalName, "scheduledTime", scheduleTime.Format(time.RFC3339))
if instance.CronAnythingSpec().TriggerDeadlineSeconds != nil {
triggerDeadline := time.Duration(*instance.CronAnythingSpec().TriggerDeadlineSeconds)
if scheduleTime.Add(triggerDeadline * time.Second).Before(now) {
log.Info("Trigger deadline exceeded", "caName", canonicalName, "triggerTime", canonicalName, "scheduleTime", scheduleTime.Format(time.RFC3339))
err = r.updateCronAnythingStatus(instance.GetName(), instance.GetNamespace(), func(freshStatus *cronanything.CronAnythingStatus) {
updateLastScheduleTime(freshStatus, scheduleTime)
historyRecord := cronanything.TriggerHistoryRecord{
ScheduleTime: metav1.NewTime(scheduleTime),
CreationTimestamp: metav1.NewTime(now),
}
if freshStatus.PendingTrigger != nil && freshStatus.PendingTrigger.ScheduleTime.Equal(getMetaTimePointer(scheduleTime)) {
historyRecord.Result = freshStatus.PendingTrigger.Result
} else {
historyRecord.Result = cronanything.TriggerResultDeadlineExceeded
}
addToTriggerHistory(freshStatus, historyRecord)
freshStatus.PendingTrigger = nil
})
if err != nil {
return reconcile.Result{}, err
}
return r.updateTriggerTimes(canonicalName, nextScheduleTime), nil
}
}
activeChildResources := findActiveChildResources(instance, childResources, log)
log.Info("Found active child resources", "caName", canonicalName, "numActiveChildResources", len(activeChildResources))
if len(activeChildResources) > 0 {
switch instance.CronAnythingSpec().ConcurrencyPolicy {
case cronanything.ForbidConcurrent:
log.Info("Found existing active resource, so no new scheduled due to ForbidConcurrent policy", "caName", canonicalName)
err = r.updateCronAnythingStatus(instance.GetName(), instance.GetNamespace(), func(freshStatus *cronanything.CronAnythingStatus) {
updateLastScheduleTime(freshStatus, scheduleTime)
freshStatus.PendingTrigger = nil
addToTriggerHistory(freshStatus, cronanything.TriggerHistoryRecord{
ScheduleTime: metav1.NewTime(scheduleTime),
CreationTimestamp: metav1.NewTime(now),
Result: cronanything.TriggerResultForbidConcurrent,
})
})
if err != nil {
return reconcile.Result{}, err
}
return r.updateTriggerTimes(canonicalName, nextScheduleTime), nil
case cronanything.ReplaceConcurrent:
// All currently active resources should be replaced. We do this by deleting them.
for _, activeResource := range activeChildResources {
// No need to delete resource if it already in the process of being deleted.
if activeResource.GetDeletionTimestamp() != nil {
continue
}
log.Info("Deleting resource due to ReplaceConcurrent policy", "caName", canonicalName, "activeResource", activeResource.GetName())
err = r.resourceControl.Delete(cgvr, activeResource.GetNamespace(), activeResource.GetName())
if err != nil {
r.eventRecorder.Eventf(instance, v1.EventTypeWarning, "FailedDeleteForReplace", "Error deleting resource %s: %v", activeResource.GetName(), err)
return reconcile.Result{}, err
}
r.eventRecorder.Eventf(instance, v1.EventTypeNormal, "DeletedForReplace", "Resource %s deleted due to Replace policy", activeResource.GetName())
}
// Returning here. Next iteration the resources will (hopefully) have been deleted and a new object can
// be created. If the deletion is not completed, it will have to wait longer.
return reconcile.Result{
RequeueAfter: 1 * time.Second,
}, nil
}
}
if instance.CronAnythingSpec().TotalResourceLimit != nil && int32(len(childResources)) >= *instance.CronAnythingSpec().TotalResourceLimit {
log.Info("Resource limit info", "caName", canonicalName, "limit", *instance.CronAnythingSpec().TotalResourceLimit, "numChildResources", len(childResources))
r.eventRecorder.Eventf(instance, v1.EventTypeWarning, "ResourceLimitReached", "Limit of %d resources has been reached", len(childResources))
log.Info("Resource limit has been reached. No new resource can be created", "caName", canonicalName)
err = r.updateCronAnythingStatus(instance.GetName(), instance.GetNamespace(), func(freshStatus *cronanything.CronAnythingStatus) {
updateLastScheduleTime(freshStatus, scheduleTime)
freshStatus.PendingTrigger = nil
addToTriggerHistory(freshStatus, cronanything.TriggerHistoryRecord{
ScheduleTime: metav1.NewTime(scheduleTime),
CreationTimestamp: metav1.NewTime(now),
Result: cronanything.TriggerResultResourceLimitReached,
})
})
if err != nil {
return reconcile.Result{}, err
}
return r.updateTriggerTimes(canonicalName, nextScheduleTime), nil
}
name := getResourceName(instance, scheduleTime)
crt.SetName(name)
labels := crt.GetLabels()
if labels == nil {
labels = make(map[string]string)
}
labels[cronanything.CronAnythingCreatedByLabel] = instance.GetName()
labels[cronanything.CronAnythingScheduleTimeLabel] = strconv.FormatInt(scheduleTime.Unix(), 10)
crt.SetLabels(labels)
if instance.CronAnythingSpec().CascadeDelete != nil && *instance.CronAnythingSpec().CascadeDelete {
crt.SetOwnerReferences([]metav1.OwnerReference{*metav1.NewControllerRef(instance, controllerKind)})
}
err = r.resourceControl.Create(cgvr, instance.GetNamespace(), crt)
if err != nil {
statusErr := r.updateCronAnythingStatus(instance.GetName(), instance.GetNamespace(), func(freshStatus *cronanything.CronAnythingStatus) {
freshStatus.PendingTrigger = &cronanything.PendingTrigger{
ScheduleTime: metav1.NewTime(scheduleTime),
Result: cronanything.TriggerResultCreateFailed,
}
})
if statusErr != nil {
log.Error(statusErr, "Failed to update status for CronAnything after failed create attempt", "caName", canonicalName)
}
r.eventRecorder.Eventf(instance, v1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
return reconcile.Result{}, err
}
log.Info("Created new resource", "caName", canonicalName, "childName", name)
r.eventRecorder.Eventf(instance, v1.EventTypeNormal, "SuccessfulCreate", "Created resource %s", name)
err = r.updateCronAnythingStatus(instance.GetName(), instance.GetNamespace(), func(freshStatus *cronanything.CronAnythingStatus) {
updateLastScheduleTime(freshStatus, scheduleTime)
freshStatus.PendingTrigger = nil
addToTriggerHistory(freshStatus, cronanything.TriggerHistoryRecord{
ScheduleTime: metav1.NewTime(scheduleTime),
CreationTimestamp: metav1.NewTime(now),
Result: cronanything.TriggerResultCreateSucceeded,
})
})
if err != nil {
return reconcile.Result{}, err
}
return r.updateTriggerTimes(canonicalName, nextScheduleTime), nil
}