pkg/controller/license/license_controller.go (221 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package license
import (
"context"
"encoding/json"
"errors"
"time"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
commonv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/common/v1"
esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/events"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/license"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/operator"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/reconciler"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/tracing"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/version"
esclient "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/client"
eslabel "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/label"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/sset"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s"
ulog "github.com/elastic/cloud-on-k8s/v3/pkg/utils/log"
)
const (
name = "license-controller"
// defaultSafetyMargin is the duration used by this controller to ensure licenses are updated well before expiry
// In case of any operational issues affecting this controller clusters will have enough runway on their current license.
defaultSafetyMargin = 30 * 24 * time.Hour
minimumRetryInterval = 1 * time.Hour
)
// Reconcile reads the cluster license for the cluster being reconciled. If found, it checks whether it is still valid.
// If there is none it assigns a new one.
// In any case it schedules a new reconcile request to be processed when the license is about to expire.
// This happens independently from any watch triggered reconcile request.
func (r *ReconcileLicenses) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
ctx = common.NewReconciliationContext(ctx, &r.iteration, r.Tracer, name, "es_name", request)
defer common.LogReconciliationRun(ulog.FromContext(ctx))()
defer tracing.EndContextTransaction(ctx)
results := r.reconcileInternal(ctx, request)
current, err := results.Aggregate()
ulog.FromContext(ctx).V(1).Info("Reconcile result", "requeue", current.Requeue, "requeueAfter", current.RequeueAfter)
return current, err
}
// Add creates a new EnterpriseLicense Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller
// and Start it when the Manager is Started.
func Add(mgr manager.Manager, p operator.Parameters) error {
r := newReconciler(mgr, p)
c, err := common.NewController(mgr, name, r, p)
if err != nil {
return err
}
return addWatches(mgr, c, r.Client)
}
// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager, params operator.Parameters) *ReconcileLicenses {
c := mgr.GetClient()
return &ReconcileLicenses{
Client: c,
Parameters: params,
checker: license.NewLicenseChecker(c, params.OperatorNamespace),
recorder: mgr.GetEventRecorderFor(name),
}
}
func nextReconcile(expiry time.Time, safety time.Duration) reconcile.Result {
return nextReconcileRelativeTo(time.Now(), expiry, safety)
}
func nextReconcileRelativeTo(now, expiry time.Time, safety time.Duration) reconcile.Result {
// short-circuit to default if no expiry given
if expiry.IsZero() {
return reconcile.Result{
RequeueAfter: minimumRetryInterval,
}
}
requeueAfter := expiry.Add(-1 * (safety / 2)).Sub(now)
if requeueAfter <= 0 {
return reconcile.Result{Requeue: true}
}
return reconcile.Result{
// requeue at expiry minus safetyMargin/2 to ensure we actually reissue a license on the next attempt
RequeueAfter: requeueAfter,
}
}
// addWatches adds a new Controller to mgr with r as the reconcile.Reconciler
func addWatches(mgr manager.Manager, c controller.Controller, k8sClient k8s.Client) error {
log := ulog.Log // no context available for contextual logging
// Watch for changes to Elasticsearch clusters.
if err := c.Watch(
source.Kind(mgr.GetCache(), &esv1.Elasticsearch{}, &handler.TypedEnqueueRequestForObject[*esv1.Elasticsearch]{})); err != nil {
return err
}
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{},
handler.TypedEnqueueRequestsFromMapFunc[*corev1.Secret](func(ctx context.Context, secret *corev1.Secret) []reconcile.Request {
if !license.IsOperatorLicense(*secret) {
return nil
}
// if a license is added/modified we want to update for potentially all clusters managed by this instance
// of ECK which is why we are listing all Elasticsearch clusters here and trigger a reconciliation
rs, err := reconcileRequestsForAllClusters(k8sClient, log)
if err != nil {
// dropping the event(s) at this point
log.Error(err, "failed to list affected clusters in enterprise license watch")
return nil
}
return rs
}),
)); err != nil {
return err
}
return nil
}
var _ reconcile.Reconciler = &ReconcileLicenses{}
// ReconcileLicenses reconciles EnterpriseLicenses with existing Elasticsearch clusters and creates ClusterLicenses for them.
type ReconcileLicenses struct {
k8s.Client
operator.Parameters
// iteration is the number of times this controller has run its Reconcile method
iteration uint64
checker license.Checker
recorder record.EventRecorder
}
// findLicense tries to find the best Elastic stack license available.
func (r *ReconcileLicenses) findLicense(ctx context.Context, c k8s.Client, checker license.Checker, minVersion *version.Version) (esclient.License, string, bool) {
licenseList, errs := license.EnterpriseLicensesOrErrors(c)
if len(errs) > 0 {
ulog.FromContext(ctx).Error(utilerrors.NewAggregate(errs), "Ignoring invalid license objects")
recordInvalidLicenseEvents(errs, r.recorder)
}
valid := func(l license.EnterpriseLicense) (bool, error) {
return checker.Valid(ctx, l)
}
return license.BestMatch(ctx, minVersion, licenseList, valid)
}
func recordInvalidLicenseEvents(errs []error, recorder record.EventRecorder) {
for _, err := range errs {
var licenseErr *license.Error
if errors.As(err, &licenseErr) {
recorder.Event(licenseErr.Source, corev1.EventTypeWarning, events.EventReasonInvalidLicense, err.Error())
}
}
}
// reconcileSecret upserts a secret in the namespace of the Elasticsearch cluster containing the signature of its license.
func reconcileSecret(
ctx context.Context,
c k8s.Client,
cluster esv1.Elasticsearch,
parent string,
esLicense esclient.License,
) error {
secretName := esv1.LicenseSecretName(cluster.Name)
licenseBytes, err := json.Marshal(esLicense)
if err != nil {
return err
}
expected := corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: secretName,
Namespace: cluster.Namespace,
Labels: map[string]string{
commonv1.TypeLabelName: license.Type,
license.LicenseLabelName: parent,
license.LicenseLabelScope: string(license.LicenseScopeElasticsearch),
license.LicenseLabelType: esLicense.Type,
},
},
Data: map[string][]byte{
license.FileName: licenseBytes,
},
}
// create/update a secret in the cluster's namespace containing the same data
_, err = reconciler.ReconcileSecret(ctx, c, expected, &cluster)
return err
}
// reconcileClusterLicense upserts a cluster license in the namespace of the given Elasticsearch cluster.
// Returns time to next reconciliation, bool whether a license is configured at all and optional error.
func (r *ReconcileLicenses) reconcileClusterLicense(ctx context.Context, cluster esv1.Elasticsearch) (time.Time, bool, error) {
log := ulog.FromContext(ctx)
var noResult time.Time
minVersion, err := r.minVersion(cluster)
if err != nil {
return noResult, true, err
}
matchingSpec, parent, found := r.findLicense(ctx, r, r.checker, minVersion)
if !found {
// no matching license found, delete cluster level license if it exists to revert to basic
clusterLicenseNSN := types.NamespacedName{Namespace: cluster.Namespace, Name: esv1.LicenseSecretName(cluster.Name)}
log.V(1).Info("No enterprise license found. Attempting to remove cluster license secret", "namespace", cluster.Namespace, "es_name", cluster.Name)
err := k8s.DeleteSecretIfExists(ctx, r.Client, clusterLicenseNSN)
return noResult, false, err
}
log.V(1).Info("Found license for cluster", "eck_license", parent, "es_license", matchingSpec.UID, "license_type", matchingSpec.Type, "namespace", cluster.Namespace, "es_name", cluster.Name)
// make sure the signature secret is created in the cluster's namespace
if err := reconcileSecret(ctx, r, cluster, parent, matchingSpec); err != nil {
return noResult, false, err
}
return matchingSpec.ExpiryTime(), false, nil
}
func (r *ReconcileLicenses) minVersion(cluster esv1.Elasticsearch) (*version.Version, error) {
pods, err := sset.GetActualPodsForCluster(r, cluster)
if err != nil {
return nil, err
}
minVersion, err := version.MinInPods(pods, eslabel.VersionLabelName)
if err != nil {
return nil, err
}
if minVersion == nil {
v, err := version.Parse(cluster.Spec.Version)
if err != nil {
return nil, err
}
minVersion = &v
}
return minVersion, nil
}
func (r *ReconcileLicenses) reconcileInternal(ctx context.Context, request reconcile.Request) *reconciler.Results {
res := &reconciler.Results{}
// Fetch the cluster to ensure it still exists
cluster := esv1.Elasticsearch{}
err := r.Get(ctx, request.NamespacedName, &cluster)
if err != nil {
if apierrors.IsNotFound(err) {
// nothing to do no cluster
return res
}
return res.WithError(err)
}
if !cluster.DeletionTimestamp.IsZero() {
// cluster is being deleted nothing to do
return res
}
newExpiry, noLicense, err := r.reconcileClusterLicense(ctx, cluster)
if err != nil {
return res.WithError(err)
}
margin := defaultSafetyMargin
if noLicense {
// don't apply safety margin if we don't have a license but use requested requeue time as specified in newExpiry
margin = 0
}
return res.WithResult(nextReconcile(newExpiry, margin))
}