in pkg/controller/elasticsearch/driver/upgrade.go [30:127]
func (d *defaultDriver) handleUpgrades(
ctx context.Context,
esClient esclient.Client,
esState ESState,
expectedResources nodespec.ResourcesList,
) *reconciler.Results {
results := &reconciler.Results{}
log := ulog.FromContext(ctx)
// We need to check that all the expectations are satisfied before continuing.
// This is to be sure that none of the previous steps has changed the state and
// that we are not running with a stale cache.
ok, reason, err := d.expectationsSatisfied(ctx)
if err != nil {
return results.WithError(err)
}
if !ok {
reason := fmt.Sprintf("Nodes upgrade: %s", reason)
return results.WithReconciliationState(defaultRequeue.WithReason(reason))
}
// Get the pods to upgrade
statefulSets, err := es_sset.RetrieveActualStatefulSets(d.Client, k8s.ExtractNamespacedName(&d.ES))
if err != nil {
return results.WithError(err)
}
podsToUpgrade, err := podsToUpgrade(d.Client, statefulSets)
if err != nil {
return results.WithError(err)
}
// Get the healthy Pods (from a K8S point of view + in the ES cluster)
healthyPods, err := healthyPods(d.Client, statefulSets, esState)
if err != nil {
return results.WithError(err)
}
nodeNameToID, err := esState.NodeNameToID()
if err != nil {
results.WithError(err)
}
logger := log.WithValues("namespace", d.ES.Namespace, "es_name", d.ES.Name)
nodeShutdown := shutdown.NewNodeShutdown(esClient, nodeNameToID, esclient.Restart, d.ES.ResourceVersion, logger)
// Maybe re-enable shards allocation and delete shutdowns if upgraded nodes are back into the cluster.
if results.WithResults(d.maybeCompleteNodeUpgrades(ctx, esClient, esState, nodeShutdown)).HasError() {
return results
}
// Get the list of pods currently existing in the StatefulSetList
currentPods, err := statefulSets.GetActualPods(d.Client)
if err != nil {
return results.WithError(err)
}
expectedMasters := expectedResources.MasterNodesNames()
// Maybe upgrade some of the nodes.
upgrade := newUpgrade(
ctx,
d,
statefulSets,
expectedResources,
esClient,
esState,
nodeShutdown,
expectedMasters,
podsToUpgrade,
healthyPods,
currentPods,
)
var deletedPods []corev1.Pod
isVersionUpgrade, err := isVersionUpgrade(d.ES)
if err != nil {
return results.WithError(err)
}
shouldDoFullRestartUpgrade := isNonHACluster(currentPods, expectedMasters) && isVersionUpgrade
if shouldDoFullRestartUpgrade {
// unconditional full cluster upgrade
deletedPods, err = run(upgrade.DeleteAll)
} else {
// regular rolling upgrade
deletedPods, err = run(upgrade.Delete)
}
if err != nil {
return results.WithError(err)
}
if len(deletedPods) > 0 {
// Some Pods have just been deleted, we don't need to try to enable shards allocation.
return results.WithReconciliationState(defaultRequeue.WithReason("Nodes upgrade in progress"))
}
if len(podsToUpgrade) > len(deletedPods) {
// Some Pods have not been updated, ensure that we retry later
results.WithReconciliationState(defaultRequeue.WithReason("Nodes upgrade in progress"))
}
return results
}