in pkg/controller/stackconfigpolicy/controller.go [250:393]
func (r *ReconcileStackConfigPolicy) reconcileElasticsearchResources(ctx context.Context, policy policyv1alpha1.StackConfigPolicy, status policyv1alpha1.StackConfigPolicyStatus) (*reconciler.Results, policyv1alpha1.StackConfigPolicyStatus) {
defer tracing.Span(&ctx)()
log := ulog.FromContext(ctx)
log.V(1).Info("Reconcile Elasticsearch resources")
results := reconciler.NewResult(ctx)
// prepare the selector to find Elastic resources to configure
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
MatchLabels: policy.Spec.ResourceSelector.MatchLabels,
MatchExpressions: policy.Spec.ResourceSelector.MatchExpressions,
})
if err != nil {
return results.WithError(err), status
}
listOpts := client.ListOptions{LabelSelector: selector}
// restrict the search to the policy namespace if it is different from the operator namespace
if policy.Namespace != r.params.OperatorNamespace {
listOpts.Namespace = policy.Namespace
}
// find the list of Elasticsearch to configure
var esList esv1.ElasticsearchList
if err := r.Client.List(ctx, &esList, &listOpts); err != nil {
return results.WithError(err), status
}
configuredResources := esMap{}
for _, es := range esList.Items {
log.V(1).Info("Reconcile StackConfigPolicy", "es_namespace", es.Namespace, "es_name", es.Name)
es := es
// keep the list of ES to be configured
esNsn := k8s.ExtractNamespacedName(&es)
configuredResources[esNsn] = es
// version gate for the ES file-based settings feature
v, err := version.Parse(es.Spec.Version)
if err != nil {
return results.WithError(err), status
}
if v.LT(filesettings.FileBasedSettingsMinPreVersion) {
err = fmt.Errorf("invalid version to configure resource Elasticsearch %s/%s: actual %s, expected >= %s", es.Namespace, es.Name, v, filesettings.FileBasedSettingsMinVersion)
r.recorder.Eventf(&policy, corev1.EventTypeWarning, events.EventReasonUnexpected, err.Error())
results.WithError(err)
err = status.AddPolicyErrorFor(esNsn, policyv1alpha1.ErrorPhase, err.Error(), policyv1alpha1.ElasticsearchResourceType)
if err != nil {
return results.WithError(err), status
}
continue
}
// the file Settings Secret must exist, if not it will be created empty by the ES controller
var actualSettingsSecret corev1.Secret
err = r.Client.Get(ctx, types.NamespacedName{Namespace: es.Namespace, Name: esv1.FileSettingsSecretName(es.Name)}, &actualSettingsSecret)
if err != nil && apierrors.IsNotFound(err) {
// requeue if the Secret has not been created yet
return results.WithResult(defaultRequeue), status
}
if err != nil {
return results.WithError(err), status
}
// check that there is no other policy that already owns the Settings Secret
currentOwner, ok := filesettings.CanBeOwnedBy(actualSettingsSecret, policy)
if !ok {
err = fmt.Errorf("conflict: resource Elasticsearch %s/%s already configured by StackConfigpolicy %s/%s", es.Namespace, es.Name, currentOwner.Namespace, currentOwner.Name)
r.recorder.Eventf(&policy, corev1.EventTypeWarning, events.EventReasonUnexpected, err.Error())
results.WithError(err)
err = status.AddPolicyErrorFor(esNsn, policyv1alpha1.ConflictPhase, err.Error(), policyv1alpha1.ElasticsearchResourceType)
if err != nil {
return results.WithError(err), status
}
continue
}
// create the expected Settings Secret
expectedSecret, expectedVersion, err := filesettings.NewSettingsSecretWithVersion(esNsn, &actualSettingsSecret, &policy)
if err != nil {
return results.WithError(err), status
}
if err := filesettings.ReconcileSecret(ctx, r.Client, expectedSecret, &es); err != nil {
return results.WithError(err), status
}
// Copy all the Secrets that are present in spec.elasticsearch.secretMounts
if err := reconcileSecretMounts(ctx, r.Client, es, &policy); err != nil {
if apierrors.IsNotFound(err) {
err = status.AddPolicyErrorFor(esNsn, policyv1alpha1.ErrorPhase, err.Error(), policyv1alpha1.ElasticsearchResourceType)
if err != nil {
return results.WithError(err), status
}
results.WithResult(defaultRequeue)
}
continue
}
// create expected elasticsearch config secret
expectedConfigSecret, err := newElasticsearchConfigSecret(policy, es)
if err != nil {
return results.WithError(err), status
}
if err = filesettings.ReconcileSecret(ctx, r.Client, expectedConfigSecret, &es); err != nil {
return results.WithError(err), status
}
// Check if required Elasticsearch config and secret mounts are applied.
configAndSecretMountsApplied, err := elasticsearchConfigAndSecretMountsApplied(ctx, r.Client, policy, es)
if err != nil {
return results.WithError(err), status
}
// get /_cluster/state to get the Settings currently configured in ES
currentSettings, err := r.getClusterStateFileSettings(ctx, es)
if err != nil {
err = status.AddPolicyErrorFor(esNsn, policyv1alpha1.UnknownPhase, err.Error(), policyv1alpha1.ElasticsearchResourceType)
if err != nil {
return results.WithError(err), status
}
// requeue if ES not reachable
results.WithResult(defaultRequeue)
}
// update the ES resource status for this ES
err = status.UpdateResourceStatusPhase(esNsn, newElasticsearchResourceStatus(currentSettings, expectedVersion), configAndSecretMountsApplied, policyv1alpha1.ElasticsearchResourceType)
if err != nil {
return results.WithError(err), status
}
}
// Add dynamic watches on the additional secret mounts
// This will also remove dynamic watches for secrets that no longer are refrenced in the stackconfigpolicy
if err = r.addDynamicWatchesOnAdditionalSecretMounts(policy); err != nil {
return results.WithError(err), status
}
// reset/delete Settings secrets for resources no longer selected by this policy
results.WithError(handleOrphanSoftOwnedSecrets(ctx, r.Client, k8s.ExtractNamespacedName(&policy), configuredResources, nil, policyv1alpha1.ElasticsearchResourceType))
return results, status
}