pkg/controller/stackconfigpolicy/controller.go (571 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package stackconfigpolicy
import (
"context"
"fmt"
"reflect"
"regexp"
"strings"
"sync/atomic"
"time"
"go.elastic.co/apm/v2"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
commonv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/common/v1"
esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1"
kibanav1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/kibana/v1"
policyv1alpha1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/stackconfigpolicy/v1alpha1"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common"
commonesclient "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/esclient"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/events"
commonlabels "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/labels"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/license"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/operator"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/reconciler"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/tracing"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/version"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/watches"
esclient "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/client"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/filesettings"
eslabel "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/label"
kblabel "github.com/elastic/cloud-on-k8s/v3/pkg/controller/kibana/label"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s"
ulog "github.com/elastic/cloud-on-k8s/v3/pkg/utils/log"
)
const (
controllerName = "stackconfigpolicy-controller"
)
var (
defaultRequeue = reconcile.Result{Requeue: true, RequeueAfter: 30 * time.Second}
)
// Add creates a new StackConfigPolicy Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller
// and Start it when the Manager is Started.
func Add(mgr manager.Manager, params operator.Parameters) error {
r := newReconciler(mgr, params)
c, err := common.NewController(mgr, controllerName, r, params)
if err != nil {
return err
}
return addWatches(mgr, c, r)
}
// newReconciler returns a new reconcile.Reconciler of StackConfigPolicy.
func newReconciler(mgr manager.Manager, params operator.Parameters) *ReconcileStackConfigPolicy {
k8sClient := mgr.GetClient()
return &ReconcileStackConfigPolicy{
Client: k8sClient,
esClientProvider: commonesclient.NewClient,
recorder: mgr.GetEventRecorderFor(controllerName),
licenseChecker: license.NewLicenseChecker(k8sClient, params.OperatorNamespace),
params: params,
dynamicWatches: watches.NewDynamicWatches(),
}
}
func addWatches(mgr manager.Manager, c controller.Controller, r *ReconcileStackConfigPolicy) error {
// watch for changes to StackConfigPolicy
if err := c.Watch(source.Kind(mgr.GetCache(), &policyv1alpha1.StackConfigPolicy{}, &handler.TypedEnqueueRequestForObject[*policyv1alpha1.StackConfigPolicy]{})); err != nil {
return err
}
// watch for changes to Elasticsearch and reconcile all StackConfigPolicy
if err := c.Watch(source.Kind[client.Object](mgr.GetCache(), &esv1.Elasticsearch{}, reconcileRequestForAllPolicies(r.Client))); err != nil {
return err
}
// watch for changes to Kibana and reconcile all StackConfigPolicy
if err := c.Watch(source.Kind[client.Object](mgr.GetCache(), &kibanav1.Kibana{}, reconcileRequestForAllPolicies(r.Client))); err != nil {
return err
}
// watch Secrets soft owned by StackConfigPolicy
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}, reconcileRequestForSoftOwnerPolicy())); err != nil {
return err
}
// watch dynamically refrenced secrets
return c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}, r.dynamicWatches.Secrets))
}
func reconcileRequestForSoftOwnerPolicy() handler.TypedEventHandler[*corev1.Secret, reconcile.Request] {
return handler.TypedEnqueueRequestsFromMapFunc[*corev1.Secret](func(ctx context.Context, secret *corev1.Secret) []reconcile.Request {
softOwner, referenced := reconciler.SoftOwnerRefFromLabels(secret.GetLabels())
if !referenced || softOwner.Kind != policyv1alpha1.Kind {
return nil
}
return []reconcile.Request{
{NamespacedName: types.NamespacedName{Namespace: softOwner.Namespace, Name: softOwner.Name}},
}
})
}
// requestsAllStackConfigPolicies returns the requests to reconcile all StackConfigPolicy resources.
func reconcileRequestForAllPolicies(clnt k8s.Client) handler.TypedEventHandler[client.Object, reconcile.Request] {
return handler.TypedEnqueueRequestsFromMapFunc[client.Object](func(ctx context.Context, es client.Object) []reconcile.Request {
var stackConfigList policyv1alpha1.StackConfigPolicyList
err := clnt.List(context.Background(), &stackConfigList)
if err != nil {
ulog.Log.Error(err, "Fail to list StackConfigurationList while watching Elasticsearch")
return nil
}
requests := make([]reconcile.Request, 0)
for _, stackConfig := range stackConfigList.Items {
stackConfig := stackConfig
requests = append(requests, reconcile.Request{NamespacedName: k8s.ExtractNamespacedName(&stackConfig)})
}
return requests
})
}
var _ reconcile.Reconciler = &ReconcileStackConfigPolicy{}
// ReconcileStackConfigPolicy reconciles a StackConfigPolicy object
type ReconcileStackConfigPolicy struct {
k8s.Client
esClientProvider commonesclient.Provider
recorder record.EventRecorder
licenseChecker license.Checker
params operator.Parameters
dynamicWatches watches.DynamicWatches
// iteration is the number of times this controller has run its Reconcile method
iteration uint64
}
// Reconcile reads that state of the cluster for a StackConfigPolicy object and makes changes based on the state read and what is
// in the StackConfigPolicy.Spec.
func (r *ReconcileStackConfigPolicy) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
ctx = common.NewReconciliationContext(ctx, &r.iteration, r.params.Tracer, controllerName, "policy_name", request)
defer common.LogReconciliationRun(ulog.FromContext(ctx))()
defer tracing.EndContextTransaction(ctx)
// retrieve the StackConfigPolicy resource
var policy policyv1alpha1.StackConfigPolicy
err := r.Client.Get(ctx, request.NamespacedName, &policy)
if err != nil {
if apierrors.IsNotFound(err) {
return reconcile.Result{}, r.onDelete(ctx,
types.NamespacedName{
Namespace: request.Namespace,
Name: request.Name,
})
}
return reconcile.Result{}, tracing.CaptureError(ctx, err)
}
// skip unmanaged resources
if common.IsUnmanaged(ctx, &policy) {
ulog.FromContext(ctx).Info("Object is currently not managed by this controller. Skipping reconciliation")
return reconcile.Result{}, nil
}
// the StackConfigPolicy will be deleted nothing to do other than remove the watches
if policy.IsMarkedForDeletion() {
return reconcile.Result{}, r.onDelete(ctx, k8s.ExtractNamespacedName(&policy))
}
// main reconciliation logic
results, status := r.doReconcile(ctx, policy)
// update status
if err := r.updateStatus(ctx, policy, status); err != nil {
if apierrors.IsConflict(err) {
return results.WithResult(reconcile.Result{Requeue: true}).Aggregate()
}
results.WithError(err)
}
return results.Aggregate()
}
// esMap is a type alias for a Map of Elasticsearch indexed by NamespaceName useful to manipulate the Elasticsearch
// clusters configured by a StackConfigPolicy.
type esMap map[types.NamespacedName]esv1.Elasticsearch
// kbMap is a type alias for a Map of Kibana indexed by NamespacedName useful to manipulate the Kibana
// instances configured by a StackConfigPolicy.
type kbMap map[types.NamespacedName]kibanav1.Kibana
func (r *ReconcileStackConfigPolicy) doReconcile(ctx context.Context, policy policyv1alpha1.StackConfigPolicy) (*reconciler.Results, policyv1alpha1.StackConfigPolicyStatus) {
log := ulog.FromContext(ctx)
log.V(1).Info("Reconcile StackConfigPolicy")
results := reconciler.NewResult(ctx)
status := policyv1alpha1.NewStatus(policy)
defer status.Update()
// Enterprise license check
enabled, err := r.licenseChecker.EnterpriseFeaturesEnabled(ctx)
if err != nil {
return results.WithError(err), status
}
if !enabled {
msg := "StackConfigPolicy is an enterprise feature. Enterprise features are disabled"
log.Info(msg)
r.recorder.Eventf(&policy, corev1.EventTypeWarning, events.EventReconciliationError, msg)
// we don't have a good way of watching for the license level to change so just requeue with a reasonably long delay
return results.WithResult(reconcile.Result{Requeue: true, RequeueAfter: 5 * time.Minute}), status
}
// run validation in case the webhook is disabled
if err := r.validate(ctx, &policy); err != nil {
status.Phase = policyv1alpha1.InvalidPhase
r.recorder.Eventf(&policy, corev1.EventTypeWarning, events.EventReasonValidation, err.Error())
return results.WithError(err), status
}
// reconcile elasticsearch resources
results, status = r.reconcileElasticsearchResources(ctx, policy, status)
// reconcile kibana resources
kibanaResults, status := r.reconcileKibanaResources(ctx, policy, status)
// Combine results from kibana reconciliation with results from Elasticsearch reconciliation
results.WithResults(kibanaResults)
// requeue if not ready
if status.Phase != policyv1alpha1.ReadyPhase {
results.WithResult(defaultRequeue)
}
return results, status
}
func (r *ReconcileStackConfigPolicy) reconcileElasticsearchResources(ctx context.Context, policy policyv1alpha1.StackConfigPolicy, status policyv1alpha1.StackConfigPolicyStatus) (*reconciler.Results, policyv1alpha1.StackConfigPolicyStatus) {
defer tracing.Span(&ctx)()
log := ulog.FromContext(ctx)
log.V(1).Info("Reconcile Elasticsearch resources")
results := reconciler.NewResult(ctx)
// prepare the selector to find Elastic resources to configure
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
MatchLabels: policy.Spec.ResourceSelector.MatchLabels,
MatchExpressions: policy.Spec.ResourceSelector.MatchExpressions,
})
if err != nil {
return results.WithError(err), status
}
listOpts := client.ListOptions{LabelSelector: selector}
// restrict the search to the policy namespace if it is different from the operator namespace
if policy.Namespace != r.params.OperatorNamespace {
listOpts.Namespace = policy.Namespace
}
// find the list of Elasticsearch to configure
var esList esv1.ElasticsearchList
if err := r.Client.List(ctx, &esList, &listOpts); err != nil {
return results.WithError(err), status
}
configuredResources := esMap{}
for _, es := range esList.Items {
log.V(1).Info("Reconcile StackConfigPolicy", "es_namespace", es.Namespace, "es_name", es.Name)
es := es
// keep the list of ES to be configured
esNsn := k8s.ExtractNamespacedName(&es)
configuredResources[esNsn] = es
// version gate for the ES file-based settings feature
v, err := version.Parse(es.Spec.Version)
if err != nil {
return results.WithError(err), status
}
if v.LT(filesettings.FileBasedSettingsMinPreVersion) {
err = fmt.Errorf("invalid version to configure resource Elasticsearch %s/%s: actual %s, expected >= %s", es.Namespace, es.Name, v, filesettings.FileBasedSettingsMinVersion)
r.recorder.Eventf(&policy, corev1.EventTypeWarning, events.EventReasonUnexpected, err.Error())
results.WithError(err)
err = status.AddPolicyErrorFor(esNsn, policyv1alpha1.ErrorPhase, err.Error(), policyv1alpha1.ElasticsearchResourceType)
if err != nil {
return results.WithError(err), status
}
continue
}
// the file Settings Secret must exist, if not it will be created empty by the ES controller
var actualSettingsSecret corev1.Secret
err = r.Client.Get(ctx, types.NamespacedName{Namespace: es.Namespace, Name: esv1.FileSettingsSecretName(es.Name)}, &actualSettingsSecret)
if err != nil && apierrors.IsNotFound(err) {
// requeue if the Secret has not been created yet
return results.WithResult(defaultRequeue), status
}
if err != nil {
return results.WithError(err), status
}
// check that there is no other policy that already owns the Settings Secret
currentOwner, ok := filesettings.CanBeOwnedBy(actualSettingsSecret, policy)
if !ok {
err = fmt.Errorf("conflict: resource Elasticsearch %s/%s already configured by StackConfigpolicy %s/%s", es.Namespace, es.Name, currentOwner.Namespace, currentOwner.Name)
r.recorder.Eventf(&policy, corev1.EventTypeWarning, events.EventReasonUnexpected, err.Error())
results.WithError(err)
err = status.AddPolicyErrorFor(esNsn, policyv1alpha1.ConflictPhase, err.Error(), policyv1alpha1.ElasticsearchResourceType)
if err != nil {
return results.WithError(err), status
}
continue
}
// create the expected Settings Secret
expectedSecret, expectedVersion, err := filesettings.NewSettingsSecretWithVersion(esNsn, &actualSettingsSecret, &policy)
if err != nil {
return results.WithError(err), status
}
if err := filesettings.ReconcileSecret(ctx, r.Client, expectedSecret, &es); err != nil {
return results.WithError(err), status
}
// Copy all the Secrets that are present in spec.elasticsearch.secretMounts
if err := reconcileSecretMounts(ctx, r.Client, es, &policy); err != nil {
if apierrors.IsNotFound(err) {
err = status.AddPolicyErrorFor(esNsn, policyv1alpha1.ErrorPhase, err.Error(), policyv1alpha1.ElasticsearchResourceType)
if err != nil {
return results.WithError(err), status
}
results.WithResult(defaultRequeue)
}
continue
}
// create expected elasticsearch config secret
expectedConfigSecret, err := newElasticsearchConfigSecret(policy, es)
if err != nil {
return results.WithError(err), status
}
if err = filesettings.ReconcileSecret(ctx, r.Client, expectedConfigSecret, &es); err != nil {
return results.WithError(err), status
}
// Check if required Elasticsearch config and secret mounts are applied.
configAndSecretMountsApplied, err := elasticsearchConfigAndSecretMountsApplied(ctx, r.Client, policy, es)
if err != nil {
return results.WithError(err), status
}
// get /_cluster/state to get the Settings currently configured in ES
currentSettings, err := r.getClusterStateFileSettings(ctx, es)
if err != nil {
err = status.AddPolicyErrorFor(esNsn, policyv1alpha1.UnknownPhase, err.Error(), policyv1alpha1.ElasticsearchResourceType)
if err != nil {
return results.WithError(err), status
}
// requeue if ES not reachable
results.WithResult(defaultRequeue)
}
// update the ES resource status for this ES
err = status.UpdateResourceStatusPhase(esNsn, newElasticsearchResourceStatus(currentSettings, expectedVersion), configAndSecretMountsApplied, policyv1alpha1.ElasticsearchResourceType)
if err != nil {
return results.WithError(err), status
}
}
// Add dynamic watches on the additional secret mounts
// This will also remove dynamic watches for secrets that no longer are refrenced in the stackconfigpolicy
if err = r.addDynamicWatchesOnAdditionalSecretMounts(policy); err != nil {
return results.WithError(err), status
}
// reset/delete Settings secrets for resources no longer selected by this policy
results.WithError(handleOrphanSoftOwnedSecrets(ctx, r.Client, k8s.ExtractNamespacedName(&policy), configuredResources, nil, policyv1alpha1.ElasticsearchResourceType))
return results, status
}
func (r *ReconcileStackConfigPolicy) reconcileKibanaResources(ctx context.Context, policy policyv1alpha1.StackConfigPolicy, status policyv1alpha1.StackConfigPolicyStatus) (*reconciler.Results, policyv1alpha1.StackConfigPolicyStatus) {
defer tracing.Span(&ctx)()
log := ulog.FromContext(ctx)
log.V(1).Info("Reconcile Kibana Resources")
results := reconciler.NewResult(ctx)
// prepare the selector to find Kibana resources to configure
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
MatchLabels: policy.Spec.ResourceSelector.MatchLabels,
MatchExpressions: policy.Spec.ResourceSelector.MatchExpressions,
})
if err != nil {
return results.WithError(err), status
}
listOpts := client.ListOptions{LabelSelector: selector}
// restrict the search to the policy namespace if it is different from the operator namespace
if policy.Namespace != r.params.OperatorNamespace {
listOpts.Namespace = policy.Namespace
}
// find the list of Kibana to configure
var kibanaList kibanav1.KibanaList
if err := r.Client.List(ctx, &kibanaList, &listOpts); err != nil {
return results.WithError(err), status
}
configuredResources := kbMap{}
for _, kibana := range kibanaList.Items {
log.V(1).Info("Reconcile StackConfigPolicy", "kibana_namespace", kibana.Namespace, "kibana_name", kibana.Name)
kibana := kibana
// keep the list of Kibana to be configured
kibanaNsn := k8s.ExtractNamespacedName(&kibana)
// check that there is no other policy that already owns the kibana config secret
currentOwner, ok, err := canBeOwned(ctx, r.Client, policy, kibana)
if err != nil {
return results.WithError(err), status
}
// record error if already owned by another stack config policy
if !ok {
err := fmt.Errorf("conflict: resource Kibana %s/%s already configured by StackConfigpolicy %s/%s", kibana.Namespace, kibana.Name, currentOwner.Namespace, currentOwner.Name)
r.recorder.Eventf(&policy, corev1.EventTypeWarning, events.EventReasonUnexpected, err.Error())
results.WithError(err)
if err := status.AddPolicyErrorFor(kibanaNsn, policyv1alpha1.ConflictPhase, err.Error(), policyv1alpha1.KibanaResourceType); err != nil {
return results.WithError(err), status
}
continue
}
// Create the Secret that holds the Kibana configuration.
if policy.Spec.Kibana.Config != nil {
// Only add to configured resources if Kibana config is set.
// This will help clean up the config secret if config gets removed from the stack config policy.
configuredResources[kibanaNsn] = kibana
expectedConfigSecret, err := newKibanaConfigSecret(policy, kibana)
if err != nil {
return results.WithError(err), status
}
if err = filesettings.ReconcileSecret(ctx, r.Client, expectedConfigSecret, &kibana); err != nil {
return results.WithError(err), status
}
}
// Check if required Kibana configs are applied.
configApplied, err := kibanaConfigApplied(r.Client, policy, kibana)
if err != nil {
return results.WithError(err), status
}
// update the Kibana resource status for this Kibana
err = status.UpdateResourceStatusPhase(kibanaNsn, policyv1alpha1.ResourcePolicyStatus{}, configApplied, policyv1alpha1.KibanaResourceType)
if err != nil {
return results.WithError(err), status
}
}
// delete Settings secrets for resources no longer selected by this policy
results.WithError(deleteOrphanSoftOwnedSecrets(ctx, r.Client, k8s.ExtractNamespacedName(&policy), nil, configuredResources, policyv1alpha1.KibanaResourceType))
return results, status
}
func newElasticsearchResourceStatus(currentSettings esclient.FileSettings, expectedVersion int64) policyv1alpha1.ResourcePolicyStatus {
status := policyv1alpha1.ResourcePolicyStatus{
CurrentVersion: currentSettings.Version,
ExpectedVersion: expectedVersion,
}
if currentSettings.Errors != nil {
status.Error = policyv1alpha1.PolicyStatusError{
Version: currentSettings.Errors.Version,
Message: cleanStackTrace(currentSettings.Errors.Errors),
}
}
return status
}
var (
matchTabsAtSpaces = regexp.MustCompile("[\t]+at\\s")
matchTripleDotsNumberMore = regexp.MustCompile("... [0-9]+ more")
)
func cleanStackTrace(errors []string) string {
for i, e := range errors {
var msg []string
for _, line := range strings.Split(e, "\n") {
if matchTabsAtSpaces.MatchString(line) || matchTripleDotsNumberMore.MatchString(line) {
continue
}
msg = append(msg, line)
}
errors[i] = strings.Trim(strings.Join(msg, "\n"), "\n")
}
return strings.Join(errors, ". ")
}
func (r *ReconcileStackConfigPolicy) validate(ctx context.Context, policy *policyv1alpha1.StackConfigPolicy) error {
span, vctx := apm.StartSpan(ctx, "validate", tracing.SpanTypeApp)
defer span.End()
if _, err := policy.ValidateCreate(); err != nil {
ulog.FromContext(ctx).Error(err, "Validation failed")
k8s.MaybeEmitErrorEvent(r.recorder, err, policy, events.EventReasonValidation, err.Error())
return tracing.CaptureError(vctx, err)
}
return nil
}
func (r *ReconcileStackConfigPolicy) updateStatus(ctx context.Context, scp policyv1alpha1.StackConfigPolicy, status policyv1alpha1.StackConfigPolicyStatus) error {
span, _ := apm.StartSpan(ctx, "update_status", tracing.SpanTypeApp)
defer span.End()
if reflect.DeepEqual(status, scp.Status) {
return nil // nothing to do
}
if status.IsDegraded(scp.Status) {
r.recorder.Event(&scp, corev1.EventTypeWarning, events.EventReasonUnhealthy, "StackConfigPolicy health degraded")
}
ulog.FromContext(ctx).V(1).Info("Updating status",
"iteration", atomic.LoadUint64(&r.iteration),
"status", status,
)
scp.Status = status
return common.UpdateStatus(ctx, r.Client, &scp)
}
func (r *ReconcileStackConfigPolicy) onDelete(ctx context.Context, obj types.NamespacedName) error {
defer tracing.Span(&ctx)()
// Remove dynamic watches on secrets
r.dynamicWatches.Secrets.RemoveHandlerForKey(additionalSecretMountsWatcherName(obj))
// Send empty resource type so that we reset/delete secrets for configured elasticsearch and kibana clusters
return handleOrphanSoftOwnedSecrets(ctx, r.Client, obj, nil, nil, "")
}
func handleOrphanSoftOwnedSecrets(
ctx context.Context,
c k8s.Client,
softOwner types.NamespacedName,
configuredESResources esMap,
configuredKibanaResources kbMap,
resourceType policyv1alpha1.ResourceType,
) error {
err := resetOrphanSoftOwnedFileSettingSecrets(ctx, c, softOwner, configuredESResources, resourceType)
if err != nil {
return err
}
return deleteOrphanSoftOwnedSecrets(ctx, c, softOwner, configuredESResources, configuredKibanaResources, resourceType)
}
// resetOrphanSoftOwnedFileSettingSecrets resets secrets for the Elasticsearch clusters that are no longer configured
// by a given StackConfigPolicy.
// An optional list of Elasticsearch currently configured by the policy can be provided to filter secrets not to be modified. Without list,
// all secrets soft owned by the policy are reset.
func resetOrphanSoftOwnedFileSettingSecrets(
ctx context.Context,
c k8s.Client,
softOwner types.NamespacedName,
configuredESResources esMap,
resourceType policyv1alpha1.ResourceType,
) error {
log := ulog.FromContext(ctx)
var secrets corev1.SecretList
matchLabels := client.MatchingLabels{
reconciler.SoftOwnerNamespaceLabel: softOwner.Namespace,
reconciler.SoftOwnerNameLabel: softOwner.Name,
reconciler.SoftOwnerKindLabel: policyv1alpha1.Kind,
commonlabels.StackConfigPolicyOnDeleteLabelName: commonlabels.OrphanSecretResetOnPolicyDelete,
}
if resourceType != "" {
matchLabels[commonv1.TypeLabelName] = string(resourceType)
}
if err := c.List(ctx,
&secrets,
// search in all namespaces
// restrict to secrets on which we set the soft owner labels
matchLabels,
); err != nil {
return err
}
for i := range secrets.Items {
s := secrets.Items[i]
configuredApplicationType := s.Labels[commonv1.TypeLabelName]
switch configuredApplicationType {
case eslabel.Type:
namespacedName := types.NamespacedName{
Namespace: s.Namespace,
Name: s.Labels[eslabel.ClusterNameLabelName],
}
if _, exists := configuredESResources[namespacedName]; exists {
continue
}
log.V(1).Info("Reconcile empty file settings Secret for Elasticsearch",
"es_namespace", namespacedName.Namespace, "es_name", namespacedName.Name,
"owner_namespace", softOwner.Namespace, "owner_name", softOwner.Name)
var es esv1.Elasticsearch
err := c.Get(ctx, namespacedName, &es)
if err != nil && !apierrors.IsNotFound(err) {
return err
}
if apierrors.IsNotFound(err) {
// Elasticsearch has just been deleted
return nil
}
if err := filesettings.ReconcileEmptyFileSettingsSecret(ctx, c, es, false); err != nil {
return err
}
case kblabel.Type:
// Currently we do not reset labels for kibana, so we shouldn't hit this.
// Implement if needed in the future
continue
default:
return fmt.Errorf("secret configured for unknown application type %s", configuredApplicationType)
}
}
return nil
}
// deleteOrphanSoftOwnedSecrets deletes secrets for the Elasticsearch/Kibana clusters that are no longer configured
// by a given StackConfigPolicy.
func deleteOrphanSoftOwnedSecrets(
ctx context.Context,
c k8s.Client,
softOwner types.NamespacedName,
configuredESResources esMap,
configuredKibanaResources kbMap,
resourceType policyv1alpha1.ResourceType,
) error {
var secrets corev1.SecretList
matchLabels := client.MatchingLabels{
reconciler.SoftOwnerNamespaceLabel: softOwner.Namespace,
reconciler.SoftOwnerNameLabel: softOwner.Name,
reconciler.SoftOwnerKindLabel: policyv1alpha1.Kind,
commonlabels.StackConfigPolicyOnDeleteLabelName: commonlabels.OrphanSecretDeleteOnPolicyDelete,
}
if resourceType != "" {
matchLabels[commonv1.TypeLabelName] = string(resourceType)
}
if err := c.List(ctx,
&secrets,
// search in all namespaces
// restrict to secrets on which we set the soft owner labels
matchLabels,
); err != nil {
return err
}
for i := range secrets.Items {
secret := secrets.Items[i]
configuredApplicationType := secret.Labels[commonv1.TypeLabelName]
switch configuredApplicationType {
case eslabel.Type:
namespacedName := types.NamespacedName{
Namespace: secret.Namespace,
Name: secret.Labels[eslabel.ClusterNameLabelName],
}
// check if they exist in the es map
if _, exist := configuredESResources[namespacedName]; exist {
continue
}
case kblabel.Type:
namespacedName := types.NamespacedName{
Namespace: secret.Namespace,
Name: secret.Labels[kblabel.KibanaNameLabelName],
}
// check if they exist in the kb map
if _, exist := configuredKibanaResources[namespacedName]; exist {
continue
}
default:
return fmt.Errorf("secret configured for unknown application type %s", configuredApplicationType)
}
// given kibana/elasticsearch cluster is no longer managed by stack config policy, delete secret.
err := c.Delete(ctx, &secret)
if err != nil && !apierrors.IsNotFound(err) {
return err
}
}
return nil
}
// getClusterStateFileSettings gets the file based settings currently configured in an Elasticsearch by calling the /_cluster/state API.
func (r *ReconcileStackConfigPolicy) getClusterStateFileSettings(ctx context.Context, es esv1.Elasticsearch) (esclient.FileSettings, error) {
span, _ := apm.StartSpan(ctx, "get_cluster_state", tracing.SpanTypeApp)
defer span.End()
esClient, err := r.esClientProvider(ctx, r.Client, r.params.Dialer, es)
if err != nil {
return esclient.FileSettings{}, err
}
defer esClient.Close()
clusterState, err := esClient.GetClusterState(ctx)
if err != nil {
return esclient.FileSettings{}, err
}
return clusterState.Metadata.ReservedState.FileSettings, nil
}
func (r *ReconcileStackConfigPolicy) addDynamicWatchesOnAdditionalSecretMounts(policy policyv1alpha1.StackConfigPolicy) error {
// Add watches if there are additional secrets to be mounted
watcher := types.NamespacedName{
Name: policy.Name,
Namespace: policy.Namespace,
}
var secretSources []commonv1.NamespacedSecretSource //nolint:prealloc
for _, secretMount := range policy.Spec.Elasticsearch.SecretMounts {
secretSources = append(secretSources, commonv1.NamespacedSecretSource{
SecretName: secretMount.SecretName,
Namespace: policy.Namespace,
})
}
// Add dynamic watches on the secrets
return watches.WatchUserProvidedNamespacedSecrets(
watcher,
r.dynamicWatches,
additionalSecretMountsWatcherName(watcher),
secretSources,
)
}
func additionalSecretMountsWatcherName(watcher types.NamespacedName) string {
return fmt.Sprintf("%s-%s-additional-secret-mounts-watcher", watcher.Name, watcher.Namespace)
}