in controllers/periodic_healthcheck.go [126:243]
func (r *EtcdadmClusterReconciler) periodicEtcdMembersHealthCheck(ctx context.Context, cluster *clusterv1.Cluster, etcdCluster *etcdv1.EtcdadmCluster, etcdadmClusterMapper map[types.UID]etcdadmClusterMemberHealthConfig) error {
log := r.Log.WithValues("EtcdadmCluster", klog.KObj(etcdCluster))
if etcdCluster.Spec.Replicas == nil {
err := fmt.Errorf("Replicas is nil")
log.Error(err, "Error performing healthcheck")
return err
}
desiredReplicas := int(*etcdCluster.Spec.Replicas)
etcdMachines, err := collections.GetFilteredMachinesForCluster(ctx, r.uncachedClient, cluster, EtcdClusterMachines(cluster.Name, etcdCluster.Name))
if err != nil {
log.Error(err, "Error filtering machines for etcd cluster")
}
ownedMachines := etcdMachines.Filter(collections.OwnedMachines(etcdCluster))
currClusterHFConfig := etcdadmClusterMapper[etcdCluster.UID]
if len(etcdMachines) == 0 {
log.Info("Skipping healthcheck because there are no etcd machines")
return nil
}
// clean up old machines
for ip := range currClusterHFConfig.unhealthyMembersFrequency {
found := false
for _, machine := range etcdMachines {
if getMemberClientURL(getEtcdMachineAddress(machine)) == ip {
found = true
}
}
if !found {
log.Info("Removing member from unhealthyMembersFrequency, member does not exist", "member", ip)
delete(currClusterHFConfig.unhealthyMembersFrequency, ip)
}
}
log.Info("Performing healthchecks on the following etcd machines", "machines", klog.KObjSlice(etcdMachines.UnsortedList()))
for _, etcdMachine := range etcdMachines {
endpoint := getMachineEtcdEndpoint(etcdMachine)
if endpoint == "" {
log.Info("Member in bootstrap phase, ignoring")
continue
}
err := r.performEndpointHealthCheck(ctx, cluster, endpoint, false)
if err != nil {
currClusterHFConfig.unhealthyMembersFrequency[endpoint]++
// only check if machine should be removed if it is owned
if _, found := ownedMachines[etcdMachine.Name]; found {
// member failed healthcheck so add it to unhealthy map or update it's unhealthy count
log.Info("Member failed healthcheck, adding to unhealthy members list", "machine", etcdMachine, "IP", endpoint,
"unhealthy frequency", currClusterHFConfig.unhealthyMembersFrequency[endpoint])
unhealthyCount := maxUnhealthyCount
if val, set := etcdCluster.Annotations[etcdv1.HealthCheckRetriesAnnotation]; set {
retries, err := strconv.Atoi(val)
if err != nil || retries < 0 {
log.Info("healthcheck-retries annotation configured with invalid value, using default retries")
}
unhealthyCount = retries
}
if currClusterHFConfig.unhealthyMembersFrequency[endpoint] >= unhealthyCount {
log.Info("Adding to list of unhealthy members to remove", "member", endpoint)
// member has been unresponsive, add the machine to unhealthyMembersToRemove queue
currClusterHFConfig.unhealthyMembersToRemove[endpoint] = etcdMachine
}
}
} else {
_, markedUnhealthy := currClusterHFConfig.unhealthyMembersFrequency[endpoint]
if markedUnhealthy {
log.Info("Removing from total unhealthy members list", "member", endpoint)
delete(currClusterHFConfig.unhealthyMembersFrequency, endpoint)
}
// member passed healthcheck, so if it was previously added to unhealthy map, remove it since only consecutive failures should lead to member removal
_, markedToDelete := currClusterHFConfig.unhealthyMembersToRemove[endpoint]
if markedToDelete {
log.Info("Removing from list of unhealthy members to remove", "member", endpoint)
delete(currClusterHFConfig.unhealthyMembersToRemove, endpoint)
}
}
}
if len(currClusterHFConfig.unhealthyMembersToRemove) == 0 {
return nil
}
var retErr error
// check if quorum is perserved before deleting any machines
if len(etcdMachines)-len(currClusterHFConfig.unhealthyMembersFrequency) >= len(etcdMachines)/2+1 {
// only touch owned machines in health check alg
for machineEndpoint, machineToDelete := range currClusterHFConfig.unhealthyMembersToRemove {
// only remove one machine at a time
currentMachines := r.getOwnedMachines(ctx, cluster, *etcdCluster)
currentMachines = currentMachines.Filter(collections.Not(collections.HasDeletionTimestamp))
if len(currentMachines) < desiredReplicas {
log.Info("Waiting for new replica to be created before deleting additional replicas")
continue
}
if err := r.removeEtcdMachine(ctx, etcdCluster, cluster, machineToDelete, getEtcdMachineAddressFromClientURL(machineEndpoint)); err != nil {
// log and save error and continue deletion of other members, deletion of this member will be retried since it's still part of unhealthyMembersToRemove
if machineToDelete == nil {
log.Error(err, "error removing etcd member machine, machine not found", "endpoint", machineEndpoint)
} else {
log.Error(err, "error removing etcd member machine", "member", machineToDelete.Name, "endpoint", machineEndpoint)
}
retErr = multierror.Append(retErr, err)
continue
}
delete(currClusterHFConfig.unhealthyMembersToRemove, machineEndpoint)
}
if retErr != nil {
return retErr
}
} else {
log.Info("Not safe to remove etcd machines, quorum not preserved")
}
etcdCluster.Status.Ready = false
return r.Client.Status().Update(ctx, etcdCluster)
}