in controllers/util/solr_scale_util.go [35:104]
func BalanceReplicasForCluster(ctx context.Context, solrCloud *solr.SolrCloud, statefulSet *appsv1.StatefulSet, balanceReason string, balanceCmdUniqueId string, logger logr.Logger) (balanceComplete bool, requestInProgress bool, retryLaterDuration time.Duration, err error) {
logger = logger.WithValues("balanceReason", balanceReason)
// If the Cloud has 1 or zero pods, there is no reason to balance replicas.
if statefulSet.Spec.Replicas == nil || *statefulSet.Spec.Replicas < 1 {
balanceComplete = true
} else {
requestId := "balance-replicas-" + balanceCmdUniqueId
// First check to see if the Async Balance request has started
if asyncState, message, asyncErr := solr_api.CheckAsyncRequest(ctx, solrCloud, requestId); asyncErr != nil {
err = asyncErr
logger.Error(err, "Error occurred while checking the status of the balance replicas task. Will try again.", "requestId", requestId)
} else if asyncState == "notfound" {
// Only start the balance command if all pods are ready
if *statefulSet.Spec.Replicas != statefulSet.Status.ReadyReplicas {
logger.Info("Cannot start balancing replicas until all pods are ready.", "pods", *statefulSet.Spec.Replicas, "readyPods", statefulSet.Status.ReadyReplicas)
} else {
// Submit new BalanceReplicas request
rebalanceRequest := &solr_api.SolrRebalanceRequest{
WaitForFinalState: true,
Async: requestId,
}
rebalanceResponse := &solr_api.SolrAsyncResponse{}
err = solr_api.CallCollectionsApiV2(ctx, solrCloud, "POST", "/api/cluster/replicas/balance", nil, rebalanceRequest, rebalanceResponse)
if isUnsupportedApi, apiError := solr_api.CheckForCollectionsApiError("BALANCE_REPLICAS", rebalanceResponse.ResponseHeader, rebalanceResponse.Error); isUnsupportedApi {
// TODO: Remove this if-statement when Solr 9.3 is the lowest supported version
logger.Error(err, "Could not balance replicas across the cluster, because the SolrCloud's version does not support this feature.")
// Swallow the error after logging it, because it's not a real error.
// Balancing is not supported, so we just need to finish the clusterOp.
err = nil
balanceComplete = true
} else if apiError != nil {
err = apiError
}
if !balanceComplete && err == nil {
logger.Info("Started balancing replicas across cluster.", "requestId", requestId)
requestInProgress = true
} else if err != nil {
logger.Error(err, "Could not balance replicas across the cluster. Will try again.")
}
}
} else {
logger.Info("Found async status", "requestId", requestId, "state", asyncState)
// Only continue to delete the pod if the ReplaceNode request is complete and successful
if asyncState == "completed" {
balanceComplete = true
logger.Info("Replica Balancing command completed successfully")
} else if asyncState == "failed" {
logger.Info("Replica Balancing command failed. Will try again", "message", message)
} else {
requestInProgress = true
}
// Delete the async request Id if the async request is successful or failed.
// If the request failed, this will cause a retry since the next reconcile won't find the async requestId in Solr.
if !requestInProgress {
if _, err = solr_api.DeleteAsyncRequest(ctx, solrCloud, requestId); err != nil {
logger.Error(err, "Could not delete Async request status.", "requestId", requestId)
balanceComplete = false
requestInProgress = true
}
}
}
}
if requestInProgress && !balanceComplete {
retryLaterDuration = time.Second * 5
}
return
}