oracle/controllers/instancecontroller/instance_controller_patching.go (363 lines of code) (raw):
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package instancecontroller
import (
"context"
"errors"
"fmt"
"reflect"
"time"
"github.com/go-logr/logr"
snapv1 "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
log "k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/api/v1alpha1"
"github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/controllers"
"github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/pkg/k8s"
)
const (
deploymentPatchingTimeout = 3 * time.Minute
)
var statefulSetImages = []string{"service", "dbinit", "logging_sidecar"}
// State transition:
// Happy case
// CreateComplete -> PatchingBackupStarted -> DeploymentSetPatchingInProgress -> DeploymentSetPatchingComplete
// -> StatefulSetPatchingInProgress -> StatefulSetPatchingComplete
// -> DatabasePatchingInProgress -> DatabasePatchingComplete -> CreateComplete
//
// Unhappy case (*/asterisk mean this state can be short-circuited due to failures in the parent state)
// CreateComplete -> PatchingBackupStarted
// -> DeploymentSetPatchingInProgress* -> DeploymentSetPatchingComplete*
// -> StatefulSetPatchingInProgress* -> StatefulSetPatchingComplete*
// -> DatabasePatchingInProgress* -> DatabasePatchingComplete*
// -> StatefulSetPatchingFailure/DatabasePatchingFailure -> PatchingRecoveryCompleted
//
// Returns
// TODO: The bool return value is not defined yet.
// * non-empty result if restore state machine needs another reconcile
// * non-empty error if any error occurred
// * empty result and empty error to continue with main reconciliation loop
func (r *InstanceReconciler) patchingStateMachine(req ctrl.Request, instanceReadyCond *v1.Condition, dbInstanceCond *v1.Condition, inst *v1alpha1.Instance, ctx context.Context, stsParams *controllers.StsParams, config *v1alpha1.Config, databasePatchingTimeout time.Duration, log logr.Logger) (ctrl.Result, error, bool) {
// Conditions not initialized yet
if instanceReadyCond == nil || dbInstanceCond == nil {
log.Info("patchingStateMachine: Instance not ready yet, proceed with main reconciliation")
return ctrl.Result{}, nil, false
}
switch instanceReadyCond.Reason {
case k8s.CreateComplete, k8s.ExportComplete, k8s.RestoreComplete, k8s.ImportComplete, k8s.PatchingRecoveryCompleted:
// PatchingRecoveryCompleted is also a stable state and we need this check to avoid the infinite loop of retrying Patching with failed images.
if instanceReadyCond.Reason == k8s.PatchingRecoveryCompleted && reflect.DeepEqual(inst.Spec.Images, inst.Status.LastFailedImages) {
return ctrl.Result{}, nil, true
}
inst.Status.CurrentActiveStateMachine = "PatchingStateMachine"
if result, err := r.startPatchingBackup(req, ctx, inst, log); err != nil {
// In case of k8s conflict retry, otherwise switch to failed state
if !apierrors.IsConflict(err) {
k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, v1.ConditionFalse, k8s.PatchingBackupFailure, "")
}
return result, err, true
}
log.Info("patchingStateMachine: CreateComplete->PatchingBackupStarted")
k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, v1.ConditionFalse, k8s.PatchingBackupStarted, "Patching Backup Started")
return ctrl.Result{Requeue: true}, nil, true
case k8s.PatchingBackupStarted:
completed, err := r.isPatchingBackupCompleted(ctx, *inst)
if err != nil {
k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, v1.ConditionFalse, k8s.PatchingBackupFailure, "")
return ctrl.Result{}, err, true
} else if !completed {
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil, true
}
log.Info("patchingStateMachine: PatchingBackupStarted->DeploymentSetPatchingInProgress")
k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, v1.ConditionFalse, k8s.DeploymentSetPatchingInProgress, "Patching backup completed, continuing patching")
return ctrl.Result{Requeue: true}, nil, true
case k8s.DeploymentSetPatchingInProgress:
elapsed := k8s.ElapsedTimeFromLastTransitionTime(instanceReadyCond, time.Second)
if elapsed > deploymentPatchingTimeout {
msg := fmt.Sprintf("agentPatchingStateMachine: Agent patching timed out after %v", deploymentPatchingTimeout)
log.Info(msg)
r.Recorder.Eventf(inst, corev1.EventTypeWarning, "InstanceReady", msg)
log.Info("agentPatchingStateMachine: DeploymentSetPatchingInProgress->DeploymentSetPatchingRollbackInProgress")
k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, v1.ConditionFalse, k8s.DeploymentSetPatchingRollbackInProgress, msg)
return ctrl.Result{}, errors.New(msg), true
}
// TODO: Reconcile other agents if we add them
res, err := r.reconcileMonitoring(ctx, inst, r.Log, stsParams.Images)
if err != nil {
log.Info("agentPatchingStateMachine: DeploymentSetPatchingInProgress->DeploymentSetPatchingRollbackInProgress")
k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, v1.ConditionFalse, k8s.DeploymentSetPatchingRollbackInProgress, "")
return ctrl.Result{}, err, true
}
if res.RequeueAfter > 0 {
return res, nil, true
}
log.Info("agentPatchingStateMachine: DeploymentSetPatchingInProgress->DeploymentSetPatchingComplete")
k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, v1.ConditionFalse, k8s.DeploymentSetPatchingComplete, "")
return ctrl.Result{Requeue: true}, nil, true
case k8s.DeploymentSetPatchingComplete:
// We know Deployment patching is complete, check status of Oracle
oracleRunning, err := r.isOracleUpAndRunning(ctx, inst, req.Namespace, log)
if err != nil {
k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, v1.ConditionFalse, k8s.DatabasePatchingFailure, "Failed to check Oracle status")
return ctrl.Result{}, err, true
}
if !oracleRunning {
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil, true
}
// If there are no new images specified with respect to the stateful set skip this state.
if !isStatefulSetPatchingRequired(inst.Status.ActiveImages, inst.Spec.Images) {
k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, v1.ConditionFalse, k8s.StatefulSetPatchingComplete, "")
return ctrl.Result{Requeue: true}, nil, true
}
// Start software patching
if _, err, _ := r.startStatefulSetPatching(req, ctx, *inst, stsParams, log); err != nil {
k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, v1.ConditionFalse, k8s.StatefulSetPatchingFailure, "")
return ctrl.Result{}, err, true
}
log.Info("patchingStateMachine: DeploymentSetPatchingComplete->StatefulSetPatchingInProgress")
k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, v1.ConditionFalse, k8s.StatefulSetPatchingInProgress, "")
return ctrl.Result{Requeue: true}, nil, true
case k8s.DeploymentSetPatchingRollbackInProgress:
elapsed := k8s.ElapsedTimeFromLastTransitionTime(instanceReadyCond, time.Second)
if elapsed > deploymentPatchingTimeout {
msg := fmt.Sprintf("agentPatchingStateMachine: Agent patching timed out after %v", deploymentPatchingTimeout)
log.Info(msg)
k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, v1.ConditionFalse, k8s.PatchingRecoveryFailure, msg)
return ctrl.Result{}, errors.New(msg), true
}
// TODO: Reconcile other agents if we add them
res, err := r.reconcileMonitoring(ctx, inst, r.Log, inst.Status.ActiveImages)
if err != nil {
log.Info("agentPatchingStateMachine: DeploymentSetPatchingRollbackInProgress->PatchingRecoveryFailure")
k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, v1.ConditionFalse, k8s.PatchingRecoveryFailure, "")
return ctrl.Result{}, err, true
}
if res.RequeueAfter > 0 {
return res, nil, true
}
k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, v1.ConditionFalse, k8s.PatchingRecoveryCompleted, "")
return ctrl.Result{Requeue: true}, nil, true
case k8s.StatefulSetPatchingInProgress:
// Track software patching runtime and terminate if its running beyond timeout interval
elapsed := k8s.ElapsedTimeFromLastTransitionTime(instanceReadyCond, time.Second)
if elapsed > databasePatchingTimeout {
msg := fmt.Sprintf("Software patching timed out after %v", databasePatchingTimeout)
log.Info(msg)
r.Recorder.Eventf(inst, corev1.EventTypeWarning, "InstanceReady", msg)
k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, v1.ConditionFalse, k8s.StatefulSetPatchingFailure, msg)
return ctrl.Result{}, errors.New(msg), true
}
// Monitor patching progress
if !r.updateProgressCondition(ctx, *inst, req.NamespacedName.Namespace, k8s.StatefulSetPatchingInProgress, log) {
log.Info("waiting for STS creation to complete: requeue after 30 seconds")
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil, true
}
log.Info("patchingStateMachine: StatefulSetPatchingInProgress->StatefulSetPatchingComplete")
k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, v1.ConditionFalse, k8s.StatefulSetPatchingComplete, "")
return ctrl.Result{Requeue: true}, nil, true
case k8s.StatefulSetPatchingComplete:
// We know STS is up, check status of Oracle
oracleRunning, err := r.isOracleUpAndRunning(ctx, inst, req.Namespace, log)
if err != nil {
k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, v1.ConditionFalse, k8s.DatabasePatchingFailure, "Failed to check Oracle status")
return ctrl.Result{}, err, true
}
if !oracleRunning {
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil, true
}
// Start patching
if err := r.startDatabasePatching(req, ctx, *inst, log); err != nil {
k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, v1.ConditionFalse, k8s.DatabasePatchingFailure, "Failed to start database patching")
return ctrl.Result{}, err, true
}
log.Info("patchingStateMachine: StatefulSetPatchingComplete->DatabasePatchingInProgress")
k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, v1.ConditionFalse, k8s.DatabasePatchingInProgress, "Calling ApplyDataPatch()")
return ctrl.Result{Requeue: true}, nil, true
case k8s.DatabasePatchingInProgress:
// Track database patching runtime and terminate if its running beyond timeout interval
elapsed := k8s.ElapsedTimeFromLastTransitionTime(instanceReadyCond, time.Second)
if elapsed > databasePatchingTimeout {
msg := fmt.Sprintf("Database patching timed out after %v", databasePatchingTimeout)
log.Info(msg)
r.Recorder.Eventf(inst, corev1.EventTypeWarning, "InstanceReady", msg)
k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, v1.ConditionFalse, k8s.DatabasePatchingFailure, msg)
return ctrl.Result{}, errors.New(msg), true
}
// Monitor patching progress
done, err := r.isDatabasePatchingDone(ctx, req, *inst, log)
if err != nil {
k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, v1.ConditionFalse, k8s.DatabasePatchingFailure, "Failed to check datapatch status")
return ctrl.Result{}, err, true
}
if !done {
log.Info("datapatch still in progress, waiting")
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil, true
}
log.Info("patchingStateMachine: DatabasePatchingInProgress->DatabasePatchingComplete")
k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, v1.ConditionFalse, k8s.DatabasePatchingComplete, "Calling ApplyDataPatch()")
return ctrl.Result{Requeue: true}, nil, true
case k8s.DatabasePatchingComplete:
log.Info("patchingStateMachine: DatabasePatchingComplete->CreateComplete")
k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, v1.ConditionTrue, k8s.CreateComplete, "")
// Update current service image path
inst.Status.ActiveImages = cloneMap(stsParams.Images)
inst.Status.CurrentActiveStateMachine = ""
log.Info("patchingStateMachine: patching done", "updating CurrentServiceImage", inst.Spec.Images)
return ctrl.Result{}, nil, true
case k8s.StatefulSetPatchingFailure, k8s.DatabasePatchingFailure:
// Remove old STS/PVC so we can recover.
if done, err := r.deleteOldSTSandPVCs(ctx, *inst, *stsParams, r.Log); err != nil {
log.Info("patchingStateMachine: PatchingRecoveryInProgress->PatchingRecoveryFailure")
k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, v1.ConditionFalse, k8s.PatchingRecoveryFailure, "Failed to restore from snapshot after patching failure")
return ctrl.Result{}, err, true
} else if !done {
r.Log.Info("STS/PVC removal in progress, waiting")
return ctrl.Result{Requeue: true}, nil, true
}
k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, v1.ConditionFalse, k8s.PatchingRecoveryInProgress, "Restoring snapshot due to patching failure")
log.Info("patchingStateMachine: XXXPatchingFailure->PatchingRecoveryInProgress")
return ctrl.Result{Requeue: true}, nil, true
case k8s.PatchingRecoveryInProgress:
// always retry recoverFromPatchingFailure to keep STS correct
// in case we flipflop between states.
if err := r.recoverFromPatchingFailure(ctx, *inst, stsParams); err != nil {
return ctrl.Result{}, err, true
}
if complete := r.isRecoveryFromPatchingFailureComplete(req, ctx, *inst); !complete {
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil, true
}
shortCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
oracleRunning, err := r.isOracleUpAndRunning(shortCtx, inst, req.Namespace, log)
if err != nil {
log.Info("patchingStateMachine: PatchingRecoveryInProgress->PatchingRecoveryFailure")
k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, v1.ConditionFalse, k8s.PatchingRecoveryFailure, "Failed to restore from snapshot after patching failure. Could not retrieve status of Oracle")
return ctrl.Result{}, err, true
}
if !oracleRunning {
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil, true
}
inst.Status.LastFailedImages = cloneMap(inst.Spec.Images)
log.Info("patchingStateMachine: PatchingRecoveryInProgress->PatchingRecoveryCompleted")
k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, v1.ConditionTrue, k8s.PatchingRecoveryCompleted, "Finished restoring from snapshot after patching failure")
inst.Status.CurrentActiveStateMachine = ""
return ctrl.Result{}, nil, true
default:
log.Info("patchingStateMachine: no action needed, proceed with main reconciliation")
return ctrl.Result{}, nil, false
}
}
func isStatefulSetPatchingRequired(currentImages map[string]string, newImages map[string]string) bool {
for _, image := range statefulSetImages {
if currentImages[image] != newImages[image] {
return true
}
}
return false
}
func (r *InstanceReconciler) startPatchingBackup(req ctrl.Request, ctx context.Context, inst *v1alpha1.Instance, log logr.Logger) (ctrl.Result, error) {
backupID, err := r.prePatchBackup(ctx, *inst)
if err != nil {
return ctrl.Result{}, err
}
log.Info("startPatchingBackup: backup id", "backupID", backupID)
inst.Status.BackupID = backupID
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}
func (r *InstanceReconciler) startStatefulSetPatching(req ctrl.Request, ctx context.Context, inst v1alpha1.Instance, stsParams *controllers.StsParams, log logr.Logger) (ctrl.Result, error, bool) {
log.Info("startStatefulSetPatching, enter")
// Delete existing stateful set
existingSTS, err := r.retrieveStatefulSetByName(ctx, req.Namespace, stsParams.StsName)
if err != nil {
r.Log.Error(err, "failed to retrieve StatefulSet")
return ctrl.Result{}, err, false
}
if err := r.Delete(ctx, existingSTS); err != nil {
k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, v1.ConditionFalse, k8s.StatefulSetPatchingFailure, "Error while deleting STS")
return ctrl.Result{}, err, false
}
// Create new stateful set
err, sts, _ := r.constructSTSandPVCs(inst, *stsParams, log)
if err != nil {
r.Log.Error(err, "failed to create a StatefulSet")
return ctrl.Result{}, err, false
}
applyOpts := []client.PatchOption{client.ForceOwnership, client.FieldOwner("instance-controller")}
if err := r.Patch(ctx, sts, client.Apply, applyOpts...); err != nil {
k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, v1.ConditionFalse, k8s.StatefulSetPatchingFailure, "Error while creating patched STS")
r.Log.Error(err, "failed to patch the restored StatefulSet")
return ctrl.Result{}, err, false
}
k8s.InstanceUpsertCondition(&inst.Status, k8s.Ready, v1.ConditionFalse, k8s.StatefulSetPatchingInProgress, "Creating new STS")
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil, true
}
func (r *InstanceReconciler) recoverFromPatchingFailure(ctx context.Context, inst v1alpha1.Instance, stsParams *controllers.StsParams) error {
log.Info("Restoring from a snapshot due to patching failure. Restoring from Backup ID ", inst.Status.BackupID)
inst.Spec.Restore = buildRestoreSpecUsingSnapshotBackupID(inst.Status.BackupID)
stsParams.Images = cloneMap(inst.Status.ActiveImages)
log.Info("recoverFromPatchingFailure: stsparams", "images", stsParams.Images)
// Start restore process
return r.restoreSnapshot(ctx, inst, *stsParams, r.Log)
}
// Checks the progress of the restore operation. Returns true if the restore has completed, false otherwise
func (r *InstanceReconciler) isRecoveryFromPatchingFailureComplete(req ctrl.Request, ctx context.Context, inst v1alpha1.Instance) bool {
if !r.updateProgressCondition(ctx, inst, req.Namespace, k8s.PatchingRecoveryInProgress, r.Log) {
log.Info("Recovery from patching failure still in progress...")
return false
}
return true
}
// Retrieves and returns a pointer to the StatefulSet with the given name within the specified namespace
func (r *InstanceReconciler) retrieveStatefulSetByName(ctx context.Context, ns, stsName string) (*appsv1.StatefulSet, error) {
foundSts := &appsv1.StatefulSet{}
if err := r.Get(ctx, types.NamespacedName{
Namespace: ns,
Name: stsName,
}, foundSts); err != nil {
r.Log.Error(err, "Failed to retrieve statefulSet named ", "stsName", stsName, " in namespace ", ns)
return nil, err
}
return foundSts, nil
}
func buildRestoreSpecUsingSnapshotBackupID(backupID string) *v1alpha1.RestoreSpec {
//The values used below are dummies except the backupID
return &v1alpha1.RestoreSpec{
BackupType: "Snapshot",
BackupID: backupID,
Dop: 0,
TimeLimitMinutes: 0,
Force: false,
RequestTime: v1.Now(),
}
}
func cloneMap(source map[string]string) map[string]string {
clone := make(map[string]string, len(source))
for key, value := range source {
clone[key] = value
}
return clone
}
func (r *InstanceReconciler) prePatchBackup(ctx context.Context, inst v1alpha1.Instance) (string, error) {
// do the same for db instance
// TODO: these snapshots should get cleaned up at some point
applyOpts := []client.PatchOption{client.ForceOwnership, client.FieldOwner("instance-controller")}
backupID := fmt.Sprint("patching-backup-", inst.Name, time.Now().Format("20060102"), time.Now().Nanosecond())
// FIXME: this should not be hard coded
vsc := "csi-gce-pd-snapshot-class"
log := r.Log.WithValues("Instance", inst.Name)
for _, diskSpec := range inst.Spec.Disks {
var shortPVCName, mount string
if controllers.IsReservedDiskName(diskSpec.Name) {
shortPVCName, mount = controllers.GetPVCNameAndMount(inst.Name, diskSpec.Name)
} else {
shortPVCName, mount = controllers.GetCustomPVCNameAndMount(&inst, diskSpec.Name)
}
fullPVCName := fmt.Sprintf("%s-%s-0", shortPVCName, fmt.Sprintf(controllers.StsName, inst.Name))
snapshotName := fmt.Sprintf("%s-%s", backupID, mount)
bk, err := controllers.NewSnapshotInst(&inst, r.SchemeVal, fullPVCName, snapshotName, vsc)
if err != nil {
return "", err
}
log.V(1).Info("new PatchingBackupSnapshot", "backup", bk)
if err := r.Patch(ctx, bk, client.Apply, applyOpts...); err != nil {
return "", err
}
}
return backupID, nil
}
func (r *InstanceReconciler) isPatchingBackupCompleted(ctx context.Context, inst v1alpha1.Instance) (bool, error) {
backupID := inst.Status.BackupID
vsc := "csi-gce-pd-snapshot-class"
log := r.Log.WithValues("Instance", inst.Name)
for _, diskSpec := range inst.Spec.Disks {
var shortPVCName, mount string
if controllers.IsReservedDiskName(diskSpec.Name) {
shortPVCName, mount = controllers.GetPVCNameAndMount(inst.Name, diskSpec.Name)
} else {
shortPVCName, mount = controllers.GetCustomPVCNameAndMount(&inst, diskSpec.Name)
}
fullPVCName := fmt.Sprintf("%s-%s-0", shortPVCName, fmt.Sprintf(controllers.StsName, inst.Name))
snapshotName := fmt.Sprintf("%s-%s", backupID, mount)
bk, err := controllers.NewSnapshotInst(&inst, r.SchemeVal, fullPVCName, snapshotName, vsc)
if err != nil {
return false, err
}
log.V(1).Info("new Backup/Snapshot resource", "backup", bk)
name := types.NamespacedName{
Namespace: inst.Namespace,
Name: snapshotName,
}
snapshot := snapv1.VolumeSnapshot{}
err = r.Get(ctx, name, &snapshot)
if err != nil || snapshot.Status == nil {
return false, err
}
status := snapshot.Status
if status.Error != nil && status.Error.Message != nil {
return false, fmt.Errorf("Snapshot Error: %s", *status.Error.Message)
}
if status.ReadyToUse != nil && !*status.ReadyToUse {
return false, nil
}
}
return true, nil
}