func()

in pkg/controller/stackconfigpolicy/controller.go [250:393]


func (r *ReconcileStackConfigPolicy) reconcileElasticsearchResources(ctx context.Context, policy policyv1alpha1.StackConfigPolicy, status policyv1alpha1.StackConfigPolicyStatus) (*reconciler.Results, policyv1alpha1.StackConfigPolicyStatus) {
	defer tracing.Span(&ctx)()
	log := ulog.FromContext(ctx)
	log.V(1).Info("Reconcile Elasticsearch resources")

	results := reconciler.NewResult(ctx)

	// prepare the selector to find Elastic resources to configure
	selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
		MatchLabels:      policy.Spec.ResourceSelector.MatchLabels,
		MatchExpressions: policy.Spec.ResourceSelector.MatchExpressions,
	})
	if err != nil {
		return results.WithError(err), status
	}
	listOpts := client.ListOptions{LabelSelector: selector}

	// restrict the search to the policy namespace if it is different from the operator namespace
	if policy.Namespace != r.params.OperatorNamespace {
		listOpts.Namespace = policy.Namespace
	}

	// find the list of Elasticsearch to configure
	var esList esv1.ElasticsearchList
	if err := r.Client.List(ctx, &esList, &listOpts); err != nil {
		return results.WithError(err), status
	}

	configuredResources := esMap{}
	for _, es := range esList.Items {
		log.V(1).Info("Reconcile StackConfigPolicy", "es_namespace", es.Namespace, "es_name", es.Name)
		es := es

		// keep the list of ES to be configured
		esNsn := k8s.ExtractNamespacedName(&es)
		configuredResources[esNsn] = es

		// version gate for the ES file-based settings feature
		v, err := version.Parse(es.Spec.Version)
		if err != nil {
			return results.WithError(err), status
		}
		if v.LT(filesettings.FileBasedSettingsMinPreVersion) {
			err = fmt.Errorf("invalid version to configure resource Elasticsearch %s/%s: actual %s, expected >= %s", es.Namespace, es.Name, v, filesettings.FileBasedSettingsMinVersion)
			r.recorder.Eventf(&policy, corev1.EventTypeWarning, events.EventReasonUnexpected, err.Error())
			results.WithError(err)
			err = status.AddPolicyErrorFor(esNsn, policyv1alpha1.ErrorPhase, err.Error(), policyv1alpha1.ElasticsearchResourceType)
			if err != nil {
				return results.WithError(err), status
			}
			continue
		}

		// the file Settings Secret must exist, if not it will be created empty by the ES controller
		var actualSettingsSecret corev1.Secret
		err = r.Client.Get(ctx, types.NamespacedName{Namespace: es.Namespace, Name: esv1.FileSettingsSecretName(es.Name)}, &actualSettingsSecret)
		if err != nil && apierrors.IsNotFound(err) {
			// requeue if the Secret has not been created yet
			return results.WithResult(defaultRequeue), status
		}
		if err != nil {
			return results.WithError(err), status
		}

		// check that there is no other policy that already owns the Settings Secret
		currentOwner, ok := filesettings.CanBeOwnedBy(actualSettingsSecret, policy)
		if !ok {
			err = fmt.Errorf("conflict: resource Elasticsearch %s/%s already configured by StackConfigpolicy %s/%s", es.Namespace, es.Name, currentOwner.Namespace, currentOwner.Name)
			r.recorder.Eventf(&policy, corev1.EventTypeWarning, events.EventReasonUnexpected, err.Error())
			results.WithError(err)
			err = status.AddPolicyErrorFor(esNsn, policyv1alpha1.ConflictPhase, err.Error(), policyv1alpha1.ElasticsearchResourceType)
			if err != nil {
				return results.WithError(err), status
			}
			continue
		}

		// create the expected Settings Secret
		expectedSecret, expectedVersion, err := filesettings.NewSettingsSecretWithVersion(esNsn, &actualSettingsSecret, &policy)
		if err != nil {
			return results.WithError(err), status
		}

		if err := filesettings.ReconcileSecret(ctx, r.Client, expectedSecret, &es); err != nil {
			return results.WithError(err), status
		}

		// Copy all the Secrets that are present in spec.elasticsearch.secretMounts
		if err := reconcileSecretMounts(ctx, r.Client, es, &policy); err != nil {
			if apierrors.IsNotFound(err) {
				err = status.AddPolicyErrorFor(esNsn, policyv1alpha1.ErrorPhase, err.Error(), policyv1alpha1.ElasticsearchResourceType)
				if err != nil {
					return results.WithError(err), status
				}
				results.WithResult(defaultRequeue)
			}
			continue
		}

		// create expected elasticsearch config secret
		expectedConfigSecret, err := newElasticsearchConfigSecret(policy, es)
		if err != nil {
			return results.WithError(err), status
		}

		if err = filesettings.ReconcileSecret(ctx, r.Client, expectedConfigSecret, &es); err != nil {
			return results.WithError(err), status
		}

		// Check if required Elasticsearch config and secret mounts are applied.
		configAndSecretMountsApplied, err := elasticsearchConfigAndSecretMountsApplied(ctx, r.Client, policy, es)
		if err != nil {
			return results.WithError(err), status
		}

		// get /_cluster/state to get the Settings currently configured in ES
		currentSettings, err := r.getClusterStateFileSettings(ctx, es)
		if err != nil {
			err = status.AddPolicyErrorFor(esNsn, policyv1alpha1.UnknownPhase, err.Error(), policyv1alpha1.ElasticsearchResourceType)
			if err != nil {
				return results.WithError(err), status
			}
			// requeue if ES not reachable
			results.WithResult(defaultRequeue)
		}

		// update the ES resource status for this ES
		err = status.UpdateResourceStatusPhase(esNsn, newElasticsearchResourceStatus(currentSettings, expectedVersion), configAndSecretMountsApplied, policyv1alpha1.ElasticsearchResourceType)
		if err != nil {
			return results.WithError(err), status
		}
	}

	// Add dynamic watches on the additional secret mounts
	// This will also remove dynamic watches for secrets that no longer are refrenced in the stackconfigpolicy
	if err = r.addDynamicWatchesOnAdditionalSecretMounts(policy); err != nil {
		return results.WithError(err), status
	}

	// reset/delete Settings secrets for resources no longer selected by this policy
	results.WithError(handleOrphanSoftOwnedSecrets(ctx, r.Client, k8s.ExtractNamespacedName(&policy), configuredResources, nil, policyv1alpha1.ElasticsearchResourceType))

	return results, status
}