in pkg/autohealing/cloudprovider/openstack/provider.go [322:489]
func (provider OpenStackCloudProvider) Repair(nodes []healthcheck.NodeInfo) error {
if len(nodes) == 0 {
return nil
}
masters := []healthcheck.NodeInfo{}
workers := []healthcheck.NodeInfo{}
clusterName := provider.Config.ClusterName
isWorkerNode := nodes[0].IsWorker
log.Infof("the node type to be repaired is worker node: %t", isWorkerNode)
if isWorkerNode {
workers = nodes
} else {
masters = nodes
}
firstTimeRebootNodes := make(map[string]healthcheck.NodeInfo)
err := provider.UpdateHealthStatus(masters, workers)
if err != nil {
return fmt.Errorf("failed to update the helath status of cluster %s, error: %v", clusterName, err)
}
cluster, err := clusters.Get(provider.Magnum, clusterName).Extract()
if err != nil {
return fmt.Errorf("failed to get the cluster %s, error: %v", clusterName, err)
}
if isWorkerNode {
for _, n := range nodes {
nodesToReplace := sets.NewString()
machineID := uuid.Parse(n.KubeNode.Status.NodeInfo.MachineID)
if machineID == nil {
log.Warningf("Failed to get the correct server ID for server %s", n.KubeNode.Name)
continue
}
serverID := machineID.String()
if processed, err := provider.firstTimeRepair(n, serverID, firstTimeRebootNodes); err != nil {
log.Warningf("Failed to process if the node %s is in first time repair , error: %v", serverID, err)
} else if processed == true {
log.Infof("Node %s has been processed", serverID)
continue
}
if _, err := provider.waitForServerDetachVolumes(serverID, 30*time.Second); err != nil {
log.Warningf("Failed to detach volumes from server %s, error: %v", serverID, err)
}
if err := provider.waitForServerPoweredOff(serverID, 30*time.Second); err != nil {
log.Warningf("Failed to shutdown the server %s, error: %v", serverID, err)
}
nodesToReplace.Insert(serverID)
ng, err := provider.getNodeGroup(clusterName, n)
ngName := "default-worker"
ngNodeCount := &cluster.NodeCount
if err == nil {
ngName = ng.Name
ngNodeCount = &ng.NodeCount
}
opts := clusters.ResizeOpts{
NodeGroup: ngName,
NodeCount: ngNodeCount,
NodesToRemove: nodesToReplace.List(),
}
clusters.Resize(provider.Magnum, clusterName, opts)
// Wait 10 seconds to make sure Magnum has already got the request
// to avoid sending all of the resize API calls at the same time.
time.Sleep(10 * time.Second)
// TODO: Ignore the result value until https://github.com/gophercloud/gophercloud/pull/1649 is merged.
//if ret.Err != nil {
// return fmt.Errorf("failed to resize cluster %s, error: %v", clusterName, ret.Err)
//}
delete(unHealthyNodes, serverID)
log.Infof("Cluster %s resized", clusterName)
}
} else {
clusterStackName, err := provider.getStackName(cluster.StackID)
if err != nil {
return fmt.Errorf("failed to get the Heat stack for cluster %s, error: %v", clusterName, err)
}
// In order to rebuild the nodes by Heat stack update, we need to know the parent stack ID of the resources and
// mark them unhealthy first.
allMapping, err := provider.getAllStackResourceMapping(clusterStackName, cluster.StackID)
if err != nil {
return fmt.Errorf("failed to get the resource stack mapping for cluster %s, error: %v", clusterName, err)
}
opts := stackresources.MarkUnhealthyOpts{
MarkUnhealthy: true,
ResourceStatusReason: "Mark resource unhealthy by autohealing service",
}
for _, n := range nodes {
machineID := uuid.Parse(n.KubeNode.Status.NodeInfo.MachineID)
if machineID == nil {
log.Warningf("Failed to get the correct server ID for server %s", n.KubeNode.Name)
continue
}
serverID := machineID.String()
if processed, err := provider.firstTimeRepair(n, serverID, firstTimeRebootNodes); err != nil {
log.Warningf("Failed to process if the node %s is in first time repair , error: %v", serverID, err)
} else if processed == true {
log.Infof("Node %s has been processed", serverID)
continue
}
if rootVolumeID, err := provider.waitForServerDetachVolumes(serverID, 30*time.Second); err != nil {
log.Warningf("Failed to detach volumes from server %s, error: %v", serverID, err)
} else {
// Mark root volume as unhealthy
if rootVolumeID != "" {
err = stackresources.MarkUnhealthy(provider.Heat, allMapping[serverID].StackName, allMapping[serverID].StackID, rootVolumeID, opts).ExtractErr()
if err != nil {
log.Errorf("failed to mark resource %s unhealthy, error: %v", rootVolumeID, err)
}
}
}
if err := provider.waitForServerPoweredOff(serverID, 180*time.Second); err != nil {
log.Warningf("Failed to shutdown the server %s, error: %v", serverID, err)
// If the server is failed to delete after 180s, then delete it to avoid the
// stack update failure later.
res := servers.ForceDelete(provider.Nova, serverID)
if res.Err != nil {
log.Warningf("Failed to delete the server %s, error: %v", serverID, err)
}
}
log.Infof("Marking Nova VM %s(Heat resource %s) unhealthy for Heat stack %s", serverID, allMapping[serverID].ResourceID, cluster.StackID)
// Mark VM as unhealthy
err = stackresources.MarkUnhealthy(provider.Heat, allMapping[serverID].StackName, allMapping[serverID].StackID, allMapping[serverID].ResourceID, opts).ExtractErr()
if err != nil {
log.Errorf("failed to mark resource %s unhealthy, error: %v", serverID, err)
}
delete(unHealthyNodes, serverID)
}
if err := stacks.UpdatePatch(provider.Heat, clusterStackName, cluster.StackID, stacks.UpdateOpts{}).ExtractErr(); err != nil {
return fmt.Errorf("failed to update Heat stack to rebuild resources, error: %v", err)
}
log.Infof("Started Heat stack update to rebuild resources, cluster: %s, stack: %s", clusterName, cluster.StackID)
}
// Remove the broken nodes from the cluster
for _, n := range nodes {
serverID := uuid.Parse(n.KubeNode.Status.NodeInfo.MachineID).String()
if _, found := firstTimeRebootNodes[serverID]; found {
log.Infof("Skip node delete for %s because it's repaired by reboot", serverID)
continue
}
if err := provider.KubeClient.CoreV1().Nodes().Delete(context.TODO(), n.KubeNode.Name, metav1.DeleteOptions{}); err != nil {
log.Errorf("Failed to remove the node %s from cluster, error: %v", n.KubeNode.Name, err)
}
}
return nil
}