in pkg/mimo/actuator/manager.go [69:258]
func (a *actuator) Process(ctx context.Context) (bool, error) {
// Get the manifests for this cluster which need to be worked
i, err := a.mmf.GetQueuedByClusterResourceID(ctx, a.clusterResourceID, "")
if err != nil {
err = fmt.Errorf("failed getting manifests: %w", err)
a.log.Error(err)
return false, err
}
docList := make([]*api.MaintenanceManifestDocument, 0)
for {
docs, err := i.Next(ctx, -1)
if err != nil {
err = fmt.Errorf("failed reading next manifest document: %w", err)
a.log.Error(err)
return false, err
}
if docs == nil {
break
}
docList = append(docList, docs.MaintenanceManifestDocuments...)
}
manifestsToAction := make([]*api.MaintenanceManifestDocument, 0)
// Order manifests in order of RunAfter, and then Priority for ones with the
// same RunAfter.
sort.SliceStable(docList, func(i, j int) bool {
if docList[i].MaintenanceManifest.RunAfter == docList[j].MaintenanceManifest.RunAfter {
return docList[i].MaintenanceManifest.Priority < docList[j].MaintenanceManifest.Priority
}
return docList[i].MaintenanceManifest.RunAfter < docList[j].MaintenanceManifest.RunAfter
})
evaluationTime := a.now()
// Check for manifests that have timed out first
for _, doc := range docList {
if evaluationTime.After(time.Unix(int64(doc.MaintenanceManifest.RunBefore), 0)) {
// timed out, mark as such
a.log.Infof("marking %v as outdated: %v older than %v", doc.ID, doc.MaintenanceManifest.RunBefore, evaluationTime.UTC())
_, err := a.mmf.Patch(ctx, a.clusterResourceID, doc.ID, func(d *api.MaintenanceManifestDocument) error {
d.MaintenanceManifest.State = api.MaintenanceManifestStateTimedOut
d.MaintenanceManifest.StatusText = fmt.Sprintf("timed out at %s", evaluationTime.UTC())
return nil
})
if err != nil {
a.log.Error(fmt.Errorf("failed to patch manifest %s with state TimedOut; will still attempt to process other manifests: %w", doc.ID, err))
}
} else {
// not timed out, do something about it
manifestsToAction = append(manifestsToAction, doc)
}
}
// Nothing to do, don't dequeue
if len(manifestsToAction) == 0 {
return false, nil
}
a.log.Infof("Processing %d manifests", len(manifestsToAction))
// Dequeue the document
oc, err := a.oc.Get(ctx, a.clusterResourceID)
if err != nil {
return false, fmt.Errorf("failed getting cluster document: %w", err)
}
oc, err = a.oc.DoDequeue(ctx, oc)
if err != nil {
return false, fmt.Errorf("failed dequeuing cluster document: %w", err) // This will include StatusPreconditionFaileds
}
// Mark the maintenance state as unplanned and put it in AdminUpdating
a.log.Infof("Marking cluster as in AdminUpdating")
oc, err = a.oc.PatchWithLease(ctx, a.clusterResourceID, func(oscd *api.OpenShiftClusterDocument) error {
oscd.OpenShiftCluster.Properties.LastProvisioningState = oscd.OpenShiftCluster.Properties.ProvisioningState
oscd.OpenShiftCluster.Properties.ProvisioningState = api.ProvisioningStateAdminUpdating
oscd.OpenShiftCluster.Properties.MaintenanceState = api.MaintenanceStateUnplanned
return nil
})
if err != nil {
err = fmt.Errorf("failed setting provisioning state on cluster document: %w", err)
a.log.Error(err)
// attempt to dequeue the document, for what it's worth
_, leaseErr := a.oc.EndLease(ctx, a.clusterResourceID, oc.OpenShiftCluster.Properties.LastProvisioningState, oc.OpenShiftCluster.Properties.FailedProvisioningState, nil)
if leaseErr != nil {
return false, fmt.Errorf("failed ending lease early on cluster document: %w", leaseErr)
}
return false, err
}
taskContext := newTaskContext(ctx, a.env, a.log, oc)
// Execute on the manifests we want to action
for _, doc := range manifestsToAction {
taskLog := a.log.WithFields(logrus.Fields{
"manifestID": doc.ID,
"taskID": doc.MaintenanceManifest.MaintenanceTaskID,
})
taskLog.Info("begin processing manifest")
// Attempt a dequeue
doc, err = a.mmf.Lease(ctx, a.clusterResourceID, doc.ID)
if err != nil {
// log and continue to the next task if it doesn't work
a.log.Error(err)
continue
}
// error if we don't know what this task is, then continue
f, ok := a.tasks[doc.MaintenanceManifest.MaintenanceTaskID]
if !ok {
a.log.Errorf("not found %v", doc.MaintenanceManifest.MaintenanceTaskID)
msg := "task ID not registered"
_, err = a.mmf.EndLease(ctx, doc.ClusterResourceID, doc.ID, api.MaintenanceManifestStateFailed, &msg)
if err != nil {
a.log.Error(fmt.Errorf("failed ending lease early on manifest: %w", err))
}
continue
}
var state api.MaintenanceManifestState
var msg string
taskLog.Info("executing manifest")
// Perform the task with a timeout
err = taskContext.RunInTimeout(time.Minute*60, func() error {
innerErr := f(taskContext, doc, oc)
if innerErr != nil {
return innerErr
}
return taskContext.Err()
})
// Pull the result message out of the task context to save, if it is set
msg = taskContext.GetResultMessage()
if err != nil {
if doc.Dequeues >= maxDequeueCount {
msg = fmt.Sprintf("did not succeed after %d times, failing -- %s", doc.Dequeues, err.Error())
state = api.MaintenanceManifestStateRetriesExceeded
taskLog.Error(msg)
} else if utilmimo.IsRetryableError(err) {
// If an error is retryable (i.e explicitly marked as a transient error
// by wrapping it in utilmimo.TransientError), then mark it back as
// Pending so that it will get picked up and retried.
state = api.MaintenanceManifestStatePending
taskLog.Error(fmt.Errorf("task returned a retryable error: %w", err))
} else {
// Terminal errors (explicitly marked or unwrapped) cause task failure
state = api.MaintenanceManifestStateFailed
taskLog.Error(fmt.Errorf("task returned a terminal error: %w", err))
}
} else {
// Mark tasks that don't have an error as succeeded implicitly
state = api.MaintenanceManifestStateCompleted
taskLog.Info("manifest executed successfully")
}
_, err = a.mmf.EndLease(ctx, doc.ClusterResourceID, doc.ID, state, &msg)
if err != nil {
taskLog.Error(fmt.Errorf("failed ending lease on manifest: %w", err))
}
taskLog.Info("manifest processing complete")
}
// Remove any set maintenance state
a.log.Info("removing maintenance state on cluster")
oc, err = a.oc.PatchWithLease(ctx, a.clusterResourceID, func(oscd *api.OpenShiftClusterDocument) error {
oscd.OpenShiftCluster.Properties.MaintenanceState = api.MaintenanceStateNone
return nil
})
if err != nil {
a.log.Error(fmt.Errorf("failed removing maintenance state on cluster document, but continuing: %w", err))
}
// release the OpenShiftCluster
a.log.Info("ending lease on cluster")
_, err = a.oc.EndLease(ctx, a.clusterResourceID, oc.OpenShiftCluster.Properties.LastProvisioningState, oc.OpenShiftCluster.Properties.FailedProvisioningState, nil)
if err != nil {
return false, fmt.Errorf("failed ending lease on cluster document: %w", err)
}
return true, nil
}