func()

in controllers/periodic_healthcheck.go [126:243]


func (r *EtcdadmClusterReconciler) periodicEtcdMembersHealthCheck(ctx context.Context, cluster *clusterv1.Cluster, etcdCluster *etcdv1.EtcdadmCluster, etcdadmClusterMapper map[types.UID]etcdadmClusterMemberHealthConfig) error {
	log := r.Log.WithValues("EtcdadmCluster", klog.KObj(etcdCluster))

	if etcdCluster.Spec.Replicas == nil {
		err := fmt.Errorf("Replicas is nil")
		log.Error(err, "Error performing healthcheck")
		return err
	}

	desiredReplicas := int(*etcdCluster.Spec.Replicas)
	etcdMachines, err := collections.GetFilteredMachinesForCluster(ctx, r.uncachedClient, cluster, EtcdClusterMachines(cluster.Name, etcdCluster.Name))
	if err != nil {
		log.Error(err, "Error filtering machines for etcd cluster")
	}
	ownedMachines := etcdMachines.Filter(collections.OwnedMachines(etcdCluster))

	currClusterHFConfig := etcdadmClusterMapper[etcdCluster.UID]
	if len(etcdMachines) == 0 {
		log.Info("Skipping healthcheck because there are no etcd machines")
		return nil
	}

	// clean up old machines
	for ip := range currClusterHFConfig.unhealthyMembersFrequency {
		found := false
		for _, machine := range etcdMachines {
			if getMemberClientURL(getEtcdMachineAddress(machine)) == ip {
				found = true
			}
		}
		if !found {
			log.Info("Removing member from unhealthyMembersFrequency, member does not exist", "member", ip)
			delete(currClusterHFConfig.unhealthyMembersFrequency, ip)
		}
	}

	log.Info("Performing healthchecks on the following etcd machines", "machines", klog.KObjSlice(etcdMachines.UnsortedList()))
	for _, etcdMachine := range etcdMachines {
		endpoint := getMachineEtcdEndpoint(etcdMachine)
		if endpoint == "" {
			log.Info("Member in bootstrap phase, ignoring")
			continue
		}
		err := r.performEndpointHealthCheck(ctx, cluster, endpoint, false)
		if err != nil {
			currClusterHFConfig.unhealthyMembersFrequency[endpoint]++
			// only check if machine should be removed if it is owned
			if _, found := ownedMachines[etcdMachine.Name]; found {
				// member failed healthcheck so add it to unhealthy map or update it's unhealthy count
				log.Info("Member failed healthcheck, adding to unhealthy members list", "machine", etcdMachine, "IP", endpoint,
					"unhealthy frequency", currClusterHFConfig.unhealthyMembersFrequency[endpoint])
				unhealthyCount := maxUnhealthyCount
				if val, set := etcdCluster.Annotations[etcdv1.HealthCheckRetriesAnnotation]; set {
					retries, err := strconv.Atoi(val)
					if err != nil || retries < 0 {
						log.Info("healthcheck-retries annotation configured with invalid value, using default retries")
					}
					unhealthyCount = retries
				}
				if currClusterHFConfig.unhealthyMembersFrequency[endpoint] >= unhealthyCount {
					log.Info("Adding to list of unhealthy members to remove", "member", endpoint)
					// member has been unresponsive, add the machine to unhealthyMembersToRemove queue
					currClusterHFConfig.unhealthyMembersToRemove[endpoint] = etcdMachine
				}
			}
		} else {
			_, markedUnhealthy := currClusterHFConfig.unhealthyMembersFrequency[endpoint]
			if markedUnhealthy {
				log.Info("Removing from total unhealthy members list", "member", endpoint)
				delete(currClusterHFConfig.unhealthyMembersFrequency, endpoint)
			}
			// member passed healthcheck, so if it was previously added to unhealthy map, remove it since only consecutive failures should lead to member removal
			_, markedToDelete := currClusterHFConfig.unhealthyMembersToRemove[endpoint]
			if markedToDelete {
				log.Info("Removing from list of unhealthy members to remove", "member", endpoint)
				delete(currClusterHFConfig.unhealthyMembersToRemove, endpoint)
			}
		}
	}

	if len(currClusterHFConfig.unhealthyMembersToRemove) == 0 {
		return nil
	}

	var retErr error
	// check if quorum is perserved before deleting any machines
	if len(etcdMachines)-len(currClusterHFConfig.unhealthyMembersFrequency) >= len(etcdMachines)/2+1 {
		// only touch owned machines in health check alg
		for machineEndpoint, machineToDelete := range currClusterHFConfig.unhealthyMembersToRemove {
			// only remove one machine at a time
			currentMachines := r.getOwnedMachines(ctx, cluster, *etcdCluster)
			currentMachines = currentMachines.Filter(collections.Not(collections.HasDeletionTimestamp))
			if len(currentMachines) < desiredReplicas {
				log.Info("Waiting for new replica to be created before deleting additional replicas")
				continue
			}
			if err := r.removeEtcdMachine(ctx, etcdCluster, cluster, machineToDelete, getEtcdMachineAddressFromClientURL(machineEndpoint)); err != nil {
				// log and save error and continue deletion of other members, deletion of this member will be retried since it's still part of unhealthyMembersToRemove
				if machineToDelete == nil {
					log.Error(err, "error removing etcd member machine, machine not found", "endpoint", machineEndpoint)
				} else {
					log.Error(err, "error removing etcd member machine", "member", machineToDelete.Name, "endpoint", machineEndpoint)
				}
				retErr = multierror.Append(retErr, err)
				continue
			}
			delete(currClusterHFConfig.unhealthyMembersToRemove, machineEndpoint)
		}
		if retErr != nil {
			return retErr
		}
	} else {
		log.Info("Not safe to remove etcd machines, quorum not preserved")
	}

	etcdCluster.Status.Ready = false
	return r.Client.Status().Update(ctx, etcdCluster)
}