in pkg/controller/controller.go [497:597]
func (c *FrameworkController) syncFramework(key string) (returnedErr error) {
startTime := time.Now()
logPfx := fmt.Sprintf("[%v]: syncFramework: ", key)
klog.Infof(logPfx + "Started")
defer func() {
if returnedErr != nil {
// returnedErr is already prefixed with logPfx
klog.Warning(returnedErr.Error())
klog.Warning(logPfx +
"Failed to due to Platform Transient Error. " +
"Will enqueue it again after rate limited delay")
}
klog.Infof(logPfx+"Completed: Duration %v", time.Since(startTime))
}()
fNamespace, fName := ci.SplitFrameworkKey(key)
localF, err := c.fLister.Frameworks(fNamespace).Get(fName)
if err != nil {
if apiErrors.IsNotFound(err) {
// GarbageCollectionController will handle the dependent object
// deletion according to the ownerReferences.
klog.Infof(logPfx+
"Skipped: Framework cannot be found in local cache: %v", err)
c.deleteExpectedFrameworkStatusInfo(key)
return nil
} else {
return fmt.Errorf(logPfx+
"Failed: Framework cannot be got from local cache: %v", err)
}
} else {
f := localF.DeepCopy()
// From now on, we only sync this f instance which is identified by its UID
// instead of its name, and the f is a writable copy of the original local
// cached one, and it may be different from the original one.
klog.Infof(logPfx+"UID %v", f.UID)
expected := c.getExpectedFrameworkStatusInfo(f.Key())
if expected == nil || expected.uid != f.UID {
if f.Status != nil {
// Recover f related things, since it is the first time we see it and
// its Status is not nil.
// No need to recover previous enqueued items, because the Informer has
// already delivered the Add events for all recovered Frameworks which
// caused all Frameworks will be enqueued to sync.
// No need to recover previous scheduled to enqueue items, because the
// schedule will be recovered during sync.
}
// f.Status must be the same as the remote one, since it is the first
// time we see it.
c.updateExpectedFrameworkStatusInfo(f.Key(), f.Status, f.UID, true)
} else {
// f.Status may be outdated, so override it with the expected one, to
// ensure the Framework.Status is Monotonically Exposed.
f.Status = expected.status
// Ensure the expected Framework.Status is the same as the remote one
// before sync.
if !expected.remoteSynced {
c.compressFramework(f)
updateErr := c.updateRemoteFrameworkStatus(f)
c.updateExpectedFrameworkStatusInfo(f.Key(), f.Status, f.UID, updateErr == nil)
if updateErr != nil {
return updateErr
}
}
}
// At this point, f.Status is the same as the expected and remote
// Framework.Status, so it is ready to sync against f.Spec and other
// related objects.
decompressErr := c.decompressFramework(f)
if decompressErr != nil {
return decompressErr
}
remoteRawF := f.DeepCopy()
errs := []error{}
syncErr := c.syncFrameworkStatus(f)
errs = append(errs, syncErr)
if !reflect.DeepEqual(remoteRawF.Status, f.Status) {
// Always update the expected and remote Framework.Status even if sync
// error, since f.Status should never be corrupted due to any Platform
// Transient Error, so no need to rollback to the one before sync, and
// no need to DeepCopy between f.Status and the expected one.
c.compressFramework(f)
updateErr := c.updateRemoteFrameworkStatus(f)
c.updateExpectedFrameworkStatusInfo(f.Key(), f.Status, f.UID, updateErr == nil)
errs = append(errs, updateErr)
} else {
klog.Infof(logPfx +
"Skip to update the expected and remote Framework.Status since " +
"they are unchanged")
}
return errorAgg.NewAggregate(errs)
}
}