func()

in controllers/controller.go [218:385]


func (r *EtcdadmClusterReconciler) reconcile(ctx context.Context, etcdCluster *etcdv1.EtcdadmCluster, cluster *clusterv1.Cluster) (ctrl.Result, error) {
	log := r.Log.WithName(etcdCluster.Name)
	var desiredReplicas int

	// Reconcile the external infrastructure reference.
	if err := r.reconcileExternalReference(ctx, cluster, etcdCluster.Spec.InfrastructureTemplate); err != nil {
		return ctrl.Result{}, err
	}

	etcdMachines, err := collections.GetFilteredMachinesForCluster(ctx, r.uncachedClient, cluster, EtcdClusterMachines(cluster.Name, etcdCluster.Name))
	if err != nil {
		return ctrl.Result{}, errors.Wrap(err, "Error filtering machines for etcd cluster")
	}

	ownedMachines := etcdMachines.Filter(collections.OwnedMachines(etcdCluster))

	ep, err := NewEtcdPlane(ctx, r.Client, cluster, etcdCluster, ownedMachines)
	if err != nil {
		return ctrl.Result{}, errors.Wrap(err, "Error initializing internal object EtcdPlane")
	}

	if len(ownedMachines) != len(etcdMachines) {
		if conditions.IsUnknown(etcdCluster, etcdv1.EtcdClusterHasNoOutdatedMembersCondition) || conditions.IsTrue(etcdCluster, etcdv1.EtcdClusterHasNoOutdatedMembersCondition) {
			conditions.MarkFalse(etcdCluster, etcdv1.EtcdClusterHasNoOutdatedMembersCondition, etcdv1.EtcdClusterHasOutdatedMembersReason, clusterv1.ConditionSeverityInfo, "%d etcd members have outdated spec", len(etcdMachines.Difference(ownedMachines)))
		}
		/* These would be the out-of-date etcd machines still belonging to the current etcd cluster as etcd members, but not owned by the EtcdadmCluster object
		When upgrading a cluster, etcd machines need to be upgraded first so that the new etcd endpoints become available. But the outdated controlplane machines
		will keep trying to connect to the etcd members they were configured with. So we cannot delete these older etcd members till controlplane rollout has finished.
		So this is only possible after an upgrade, and these machines can be deleted only after controlplane upgrade has finished. */

		if _, ok := etcdCluster.Annotations[clusterv1.ControlPlaneUpgradeCompletedAnnotation]; ok {
			outdatedMachines := etcdMachines.Difference(ownedMachines)
			log.Info(fmt.Sprintf("Controlplane upgrade has completed, deleting older outdated etcd members: %v", outdatedMachines.Names()))
			for _, outdatedMachine := range outdatedMachines {
				outdatedMachineAddress := getEtcdMachineAddress(outdatedMachine)
				if err := r.removeEtcdMachine(ctx, etcdCluster, cluster, outdatedMachine, outdatedMachineAddress); err != nil {
					return ctrl.Result{}, err
				}
			}
			// requeue so controller reconciles after last machine is deleted and the "EtcdClusterHasNoOutdatedMembersCondition" is marked true
			return ctrl.Result{Requeue: true}, nil
		}
	} else {
		if _, ok := etcdCluster.Annotations[clusterv1.ControlPlaneUpgradeCompletedAnnotation]; ok {
			log.Info("Outdated etcd members deleted, removing controlplane-upgrade complete annotation")
			delete(etcdCluster.Annotations, clusterv1.ControlPlaneUpgradeCompletedAnnotation)
		}
		if conditions.IsFalse(etcdCluster, etcdv1.EtcdClusterHasNoOutdatedMembersCondition) {
			log.Info(fmt.Sprintf("Outdated etcd members deleted, setting %s to true", etcdv1.EtcdClusterHasNoOutdatedMembersCondition))
			conditions.MarkTrue(etcdCluster, etcdv1.EtcdClusterHasNoOutdatedMembersCondition)
		}
	}

	// This aggregates the state of all machines
	conditions.SetAggregate(etcdCluster, etcdv1.EtcdMachinesReadyCondition, ownedMachines.ConditionGetters(), conditions.AddSourceRef(), conditions.WithStepCounterIf(false))

	numCurrentMachines := len(ownedMachines)
	numAllEtcdMachines := len(etcdMachines)
	desiredReplicas = int(*etcdCluster.Spec.Replicas)

	// Etcd machines rollout due to configuration changes (e.g. upgrades) takes precedence over other operations.
	needRollout := ep.MachinesNeedingRollout()
	numNeedRollout := len(needRollout)

	ep2, err := NewEtcdPlane(ctx, r.Client, cluster, etcdCluster, etcdMachines)
	if err != nil {
		return ctrl.Result{}, errors.Wrap(err, "Error initializing internal object EtcdPlane")
	}
	numOutOfDateMachines := len(ep2.OutOfDateMachines())

	switch {
	case len(needRollout) > 0:
		log.Info("Etcd cluster needs a rollout", "totalMachines", numAllEtcdMachines, "needRollout", numNeedRollout)
		// NOTE: There has been issues with etcd rolling out new machines till infinity. Add an upper limit as a fail safe against this situation.
		if numAllEtcdMachines > numOutOfDateMachines+desiredReplicas {
			log.Info("Cluster has reached the max number of machines, won't create new machines until at least one is deleted", "totalMachines", numAllEtcdMachines)
			conditions.MarkFalse(ep.EC, etcdv1.EtcdMachinesSpecUpToDateCondition, etcdv1.MaxNumberOfEtcdMachinesReachedReason, clusterv1.ConditionSeverityWarning, "Etcd cluster has %d total machines, maximum number of machines is %d", numAllEtcdMachines, 2*desiredReplicas)
			return ctrl.Result{}, nil
		}
		log.Info("Rolling out Etcd machines", "needRollout", needRollout.Names())
		if conditions.IsFalse(ep.EC, etcdv1.EtcdMachinesSpecUpToDateCondition) && len(ep.UpToDateMachines()) > 0 {
			// update is already in progress, some machines have been rolled out with the new spec
			newestUpToDateMachine := ep.NewestUpToDateMachine()
			newestUpToDateMachineCreationTime := newestUpToDateMachine.CreationTimestamp.Time
			nextMachineUpdateTime := newestUpToDateMachineCreationTime.Add(time.Duration(minEtcdMemberReadySeconds) * time.Second)
			if nextMachineUpdateTime.After(time.Now()) {
				// the latest machine with updated spec should get more time for etcd data sync
				// requeue this after
				after := time.Until(nextMachineUpdateTime)
				log.Info(fmt.Sprintf("Requeueing etcdadm cluster for updating next machine after %s", after.String()))
				return ctrl.Result{RequeueAfter: after}, nil
			}
			// otherwise, if the minimum time to wait between successive machine updates has passed,
			// check that the latest etcd member is ready
			address := getEtcdMachineAddress(newestUpToDateMachine)
			if address == "" {
				return ctrl.Result{}, nil
			}
			// if member passes healthcheck, that is proof that data sync happened and we can proceed further with upgrade
			if err := r.performEndpointHealthCheck(ctx, cluster, getMemberClientURL(address), true); err != nil {
				return ctrl.Result{}, err
			}
		}
		conditions.MarkFalse(ep.EC, etcdv1.EtcdMachinesSpecUpToDateCondition, etcdv1.EtcdRollingUpdateInProgressReason, clusterv1.ConditionSeverityWarning, "Rolling %d replicas with outdated spec (%d replicas up to date)", len(needRollout), len(ep.Machines)-len(needRollout))
		conditions.MarkFalse(ep.EC, etcdv1.EtcdCertificatesAvailableCondition, etcdv1.EtcdRollingUpdateInProgressReason, clusterv1.ConditionSeverityWarning, "Rolling %d replicas with outdated spec (%d replicas up to date)", len(needRollout), len(ep.Machines)-len(needRollout))

		return r.upgradeEtcdCluster(ctx, cluster, etcdCluster, ep, needRollout)
	default:
		// make sure last upgrade operation is marked as completed.
		// NOTE: we are checking the condition already exists in order to avoid to set this condition at the first
		// reconciliation/before a rolling upgrade actually starts.
		if conditions.Has(ep.EC, etcdv1.EtcdMachinesSpecUpToDateCondition) {
			conditions.MarkTrue(ep.EC, etcdv1.EtcdMachinesSpecUpToDateCondition)

			_, hasUpgradeAnnotation := etcdCluster.Annotations[etcdv1.UpgradeInProgressAnnotation]
			if hasUpgradeAnnotation {
				delete(etcdCluster.Annotations, etcdv1.UpgradeInProgressAnnotation)
			}
		}

		// If the ETCD nodes have performed a rollout, the etcd client certs on the CP nodes need to be renewed.
		// We mark the condition EtcdCertificatesAvailable False in the rollout case and check for its value.
		// The default case is hit right after ScaleUp of ETCD nodes is completed and before the first CP comes up.
		// If EtcdCertificatesAvailable is False, this means we need to update the certs.
		// EtcdCertificatesAvailable is set to True once the certs are updated.
		if conditions.IsFalse(ep.EC, etcdv1.EtcdCertificatesAvailableCondition) {
			log.Info("Updating Etcd client certs")
			if err := r.generateCAandClientCertSecrets(ctx, cluster, etcdCluster); err != nil {
				r.Log.Error(err, "error generating etcd CA certs")
				return ctrl.Result{}, err
			}
		}
	}

	switch {
	case numCurrentMachines < desiredReplicas && numCurrentMachines == 0:
		// Create first etcd machine to run etcdadm init
		log.Info("Initializing etcd cluster", "Desired", desiredReplicas, "Existing", numCurrentMachines)
		conditions.MarkFalse(etcdCluster, etcdv1.InitializedCondition, etcdv1.WaitingForEtcdadmInitReason, clusterv1.ConditionSeverityInfo, "")
		conditions.MarkFalse(etcdCluster, etcdv1.EtcdEndpointsAvailable, etcdv1.WaitingForEtcdadmEndpointsToPassHealthcheckReason, clusterv1.ConditionSeverityInfo, "")
		return r.intializeEtcdCluster(ctx, etcdCluster, cluster, ep)
	case numCurrentMachines > 0 && conditions.IsFalse(etcdCluster, etcdv1.InitializedCondition):
		// as soon as first etcd machine is up, etcdadm init would be run on it to initialize the etcd cluster, update the condition
		if !etcdCluster.Status.Initialized {
			// defer func in Reconcile will requeue it after 20 sec
			return ctrl.Result{}, nil
		}
		// since etcd cluster has been initialized
		conditions.MarkTrue(etcdCluster, etcdv1.InitializedCondition)
	case numCurrentMachines < desiredReplicas && numCurrentMachines > 0:
		log.Info("Scaling up etcd cluster", "Desired", desiredReplicas, "Existing", numCurrentMachines)
		return r.scaleUpEtcdCluster(ctx, etcdCluster, cluster, ep)
	case numCurrentMachines > desiredReplicas:
		log.Info("Scaling down etcd cluster", "Desired", desiredReplicas, "Existing", numCurrentMachines)
		// The last parameter corresponds to Machines that need to be rolled out, eg during upgrade, should always be empty here.
		return r.scaleDownEtcdCluster(ctx, etcdCluster, cluster, ep, collections.Machines{})
	// In the case that we do a scale operation on etcd clusters, remove upgradeInProgressAnnotation once scale is complete and there
	// are no more out of date machines
	case numCurrentMachines == desiredReplicas && numNeedRollout == 0:
		_, hasUpgradeAnnotation := etcdCluster.Annotations[etcdv1.UpgradeInProgressAnnotation]
		if hasUpgradeAnnotation {
			log.Info("Removing update in progress annotation", "upgrading", hasUpgradeAnnotation)
			delete(etcdCluster.Annotations, etcdv1.UpgradeInProgressAnnotation)
		}
	}

	return ctrl.Result{}, nil
}