func()

in pkg/controller/elasticsearch/driver/upgrade.go [30:127]


func (d *defaultDriver) handleUpgrades(
	ctx context.Context,
	esClient esclient.Client,
	esState ESState,
	expectedResources nodespec.ResourcesList,
) *reconciler.Results {
	results := &reconciler.Results{}
	log := ulog.FromContext(ctx)

	// We need to check that all the expectations are satisfied before continuing.
	// This is to be sure that none of the previous steps has changed the state and
	// that we are not running with a stale cache.
	ok, reason, err := d.expectationsSatisfied(ctx)
	if err != nil {
		return results.WithError(err)
	}
	if !ok {
		reason := fmt.Sprintf("Nodes upgrade: %s", reason)
		return results.WithReconciliationState(defaultRequeue.WithReason(reason))
	}

	// Get the pods to upgrade
	statefulSets, err := es_sset.RetrieveActualStatefulSets(d.Client, k8s.ExtractNamespacedName(&d.ES))
	if err != nil {
		return results.WithError(err)
	}
	podsToUpgrade, err := podsToUpgrade(d.Client, statefulSets)
	if err != nil {
		return results.WithError(err)
	}
	// Get the healthy Pods (from a K8S point of view + in the ES cluster)
	healthyPods, err := healthyPods(d.Client, statefulSets, esState)
	if err != nil {
		return results.WithError(err)
	}

	nodeNameToID, err := esState.NodeNameToID()
	if err != nil {
		results.WithError(err)
	}
	logger := log.WithValues("namespace", d.ES.Namespace, "es_name", d.ES.Name)
	nodeShutdown := shutdown.NewNodeShutdown(esClient, nodeNameToID, esclient.Restart, d.ES.ResourceVersion, logger)

	// Maybe re-enable shards allocation and delete shutdowns if upgraded nodes are back into the cluster.
	if results.WithResults(d.maybeCompleteNodeUpgrades(ctx, esClient, esState, nodeShutdown)).HasError() {
		return results
	}

	// Get the list of pods currently existing in the StatefulSetList
	currentPods, err := statefulSets.GetActualPods(d.Client)
	if err != nil {
		return results.WithError(err)
	}

	expectedMasters := expectedResources.MasterNodesNames()

	// Maybe upgrade some of the nodes.
	upgrade := newUpgrade(
		ctx,
		d,
		statefulSets,
		expectedResources,
		esClient,
		esState,
		nodeShutdown,
		expectedMasters,
		podsToUpgrade,
		healthyPods,
		currentPods,
	)

	var deletedPods []corev1.Pod

	isVersionUpgrade, err := isVersionUpgrade(d.ES)
	if err != nil {
		return results.WithError(err)
	}
	shouldDoFullRestartUpgrade := isNonHACluster(currentPods, expectedMasters) && isVersionUpgrade
	if shouldDoFullRestartUpgrade {
		// unconditional full cluster upgrade
		deletedPods, err = run(upgrade.DeleteAll)
	} else {
		// regular rolling upgrade
		deletedPods, err = run(upgrade.Delete)
	}
	if err != nil {
		return results.WithError(err)
	}
	if len(deletedPods) > 0 {
		// Some Pods have just been deleted, we don't need to try to enable shards allocation.
		return results.WithReconciliationState(defaultRequeue.WithReason("Nodes upgrade in progress"))
	}
	if len(podsToUpgrade) > len(deletedPods) {
		// Some Pods have not been updated, ensure that we retry later
		results.WithReconciliationState(defaultRequeue.WithReason("Nodes upgrade in progress"))
	}
	return results
}