pkg/controller/kibana/driver.go (295 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package kibana
import (
"context"
"fmt"
"hash/fnv"
pkgerrors "github.com/pkg/errors"
"go.elastic.co/apm/v2"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
kbv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/kibana/v1"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/association"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common"
commonassociation "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/association"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/certificates"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/defaults"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/deployment"
driver2 "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/driver"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/events"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/keystore"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/operator"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/reconciler"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/tracing"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/version"
commonvolume "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/volume"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/watches"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/kibana/initcontainer"
kblabel "github.com/elastic/cloud-on-k8s/v3/pkg/controller/kibana/label"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/kibana/network"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/kibana/stackmon"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s"
ulog "github.com/elastic/cloud-on-k8s/v3/pkg/utils/log"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/maps"
)
// minSupportedVersion is the minimum version of Kibana supported by ECK. Currently this is set to version 7.0.0.
var minSupportedVersion = version.From(7, 0, 0)
type driver struct {
client k8s.Client
dynamicWatches watches.DynamicWatches
recorder record.EventRecorder
version version.Version
ipFamily corev1.IPFamily
}
func (d *driver) DynamicWatches() watches.DynamicWatches {
return d.dynamicWatches
}
func (d *driver) K8sClient() k8s.Client {
return d.client
}
func (d *driver) Recorder() record.EventRecorder {
return d.recorder
}
var _ driver2.Interface = &driver{}
func newDriver(
client k8s.Client,
watches watches.DynamicWatches,
recorder record.EventRecorder,
kb *kbv1.Kibana,
ipFamily corev1.IPFamily,
) (*driver, error) {
ver, err := version.Parse(kb.Spec.Version)
if err != nil {
k8s.MaybeEmitErrorEvent(recorder, err, kb, events.EventReasonValidation, "Invalid version '%s': %v", kb.Spec.Version, err)
return nil, err
}
if !ver.GTE(minSupportedVersion) {
err := pkgerrors.Errorf("unsupported Kibana version: %s", ver)
k8s.MaybeEmitErrorEvent(recorder, err, kb, events.EventReasonValidation, "Unsupported Kibana version")
return nil, err
}
return &driver{
client: client,
dynamicWatches: watches,
recorder: recorder,
version: ver,
ipFamily: ipFamily,
}, nil
}
func (d *driver) Reconcile(
ctx context.Context,
state *State,
kb *kbv1.Kibana,
params operator.Parameters,
) *reconciler.Results {
results := reconciler.NewResult(ctx)
isEsAssocConfigured, err := association.IsConfiguredIfSet(ctx, kb.EsAssociation(), d.recorder)
if err != nil {
return results.WithError(err)
}
if !isEsAssocConfigured {
return results
}
isEntAssocConfigured, err := association.IsConfiguredIfSet(ctx, kb.EntAssociation(), d.recorder)
if err != nil {
return results.WithError(err)
}
if !isEntAssocConfigured {
return results
}
svc, err := common.ReconcileService(ctx, d.client, NewService(*kb), kb)
if err != nil {
// TODO: consider updating some status here?
return results.WithError(err)
}
_, results = certificates.Reconciler{
K8sClient: d.K8sClient(),
DynamicWatches: d.DynamicWatches(),
Owner: kb,
TLSOptions: kb.Spec.HTTP.TLS,
Namer: kbv1.KBNamer,
Labels: kb.GetIdentityLabels(),
Services: []corev1.Service{*svc},
GlobalCA: params.GlobalCA,
CACertRotation: params.CACertRotation,
CertRotation: params.CertRotation,
GarbageCollectSecrets: true,
}.ReconcileCAAndHTTPCerts(ctx)
if results.HasError() {
_, err := results.Aggregate()
k8s.MaybeEmitErrorEvent(d.Recorder(), err, kb, events.EventReconciliationError, "Certificate reconciliation error: %v", err)
return results
}
logger := ulog.FromContext(ctx)
assocAllowed, err := association.AllowVersion(d.version, kb, logger, d.Recorder())
if err != nil {
return results.WithError(err)
}
if !assocAllowed {
return results // will eventually retry
}
kibanaPolicyCfg, err := getPolicyConfig(ctx, d.client, *kb)
if err != nil {
return results.WithError(err)
}
kbSettings, err := NewConfigSettings(ctx, d.client, *kb, d.version, d.ipFamily, kibanaPolicyCfg.KibanaConfig)
if err != nil {
return results.WithError(err)
}
if err = ReconcileConfigSecret(ctx, d.client, *kb, kbSettings); err != nil {
return results.WithError(err)
}
basePath, err := GetKibanaBasePath(*kb)
if err != nil {
return results.WithError(err)
}
if err = stackmon.ReconcileConfigSecrets(ctx, d.client, *kb, basePath); err != nil {
return results.WithError(err)
}
if err = initcontainer.ReconcileScriptsConfigMap(ctx, d.client, *kb); err != nil {
return results.WithError(err)
}
span, _ := apm.StartSpan(ctx, "reconcile_deployment", tracing.SpanTypeApp)
defer span.End()
deploymentParams, err := d.deploymentParams(ctx, kb, kibanaPolicyCfg.PodAnnotations, basePath, params.SetDefaultSecurityContext)
if err != nil {
return results.WithError(err)
}
expectedDp := deployment.New(deploymentParams)
reconciledDp, err := deployment.Reconcile(ctx, d.client, expectedDp, kb)
if err != nil {
return results.WithError(err)
}
existingPods, err := k8s.PodsMatchingLabels(d.K8sClient(), kb.Namespace, map[string]string{kblabel.KibanaNameLabelName: kb.Name})
if err != nil {
return results.WithError(err)
}
deploymentStatus, err := common.DeploymentStatus(ctx, state.Kibana.Status.DeploymentStatus, reconciledDp, existingPods, kblabel.KibanaVersionLabelName)
if err != nil {
return results.WithError(err)
}
state.Kibana.Status.DeploymentStatus = deploymentStatus
return results
}
// getStrategyType decides which deployment strategy (RollingUpdate or Recreate) to use based on whether the version
// upgrade is in progress. Kibana does not support a smooth rolling upgrade from one version to another:
// running multiple versions simultaneously may lead to concurrency bugs and data corruption.
func (d *driver) getStrategyType(kb *kbv1.Kibana) (appsv1.DeploymentStrategyType, error) {
var pods corev1.PodList
var labels client.MatchingLabels = map[string]string{kblabel.KibanaNameLabelName: kb.Name}
if err := d.client.List(context.Background(), &pods, client.InNamespace(kb.Namespace), labels); err != nil {
return "", err
}
for _, pod := range pods.Items {
ver, ok := pod.Labels[kblabel.KibanaVersionLabelName]
// if label is missing we assume that the last reconciliation was done by previous version of the operator
// to be safe, we assume the Kibana version has changed when operator was offline and use Recreate,
// otherwise we may run into data corruption/data loss.
if !ok || ver != kb.Spec.Version {
return appsv1.RecreateDeploymentStrategyType, nil
}
}
return appsv1.RollingUpdateDeploymentStrategyType, nil
}
func (d *driver) deploymentParams(ctx context.Context, kb *kbv1.Kibana, policyAnnotations map[string]string, basePath string, setDefaultSecurityContext bool) (deployment.Params, error) {
initContainersParameters, err := initcontainer.NewInitContainersParameters(kb)
if err != nil {
return deployment.Params{}, err
}
// setup a keystore with secure settings in an init container, if specified by the user
keystoreResources, err := keystore.ReconcileResources(
ctx,
d,
kb,
kbv1.KBNamer,
kb.GetIdentityLabels(),
initContainersParameters,
)
if err != nil {
return deployment.Params{}, err
}
volumes, err := d.buildVolumes(kb)
if err != nil {
return deployment.Params{}, err
}
kibanaPodSpec, err := NewPodTemplateSpec(ctx, d.client, *kb, keystoreResources, volumes, basePath, setDefaultSecurityContext)
if err != nil {
return deployment.Params{}, err
}
// Build a checksum of the configuration, which we can use to cause the Deployment to roll Kibana
// instances in case of any change in the CA file, secure settings or credentials contents.
// This is done because Kibana does not support updating those without restarting the process.
configHash := fnv.New32a()
if keystoreResources != nil {
_, _ = configHash.Write([]byte(keystoreResources.Hash))
}
// we need to deref the secret here to include it in the checksum otherwise Kibana will not be rolled on contents changes
if err := commonassociation.WriteAssocsToConfigHash(d.client, kb.GetAssociations(), configHash); err != nil {
return deployment.Params{}, err
}
if kb.Spec.HTTP.TLS.Enabled() {
// fetch the secret to calculate the checksum
var httpCerts corev1.Secret
err := d.client.Get(ctx, types.NamespacedName{
Namespace: kb.Namespace,
Name: certificates.InternalCertsSecretName(kbv1.KBNamer, kb.Name),
}, &httpCerts)
if err != nil {
return deployment.Params{}, err
}
if httpCert, ok := httpCerts.Data[certificates.CertFileName]; ok {
_, _ = configHash.Write(httpCert)
}
}
// get config secret to add its content to the config checksum
configSecret := corev1.Secret{}
err = d.client.Get(ctx, types.NamespacedName{Name: kbv1.ConfigSecret(kb.Name), Namespace: kb.Namespace}, &configSecret)
if err != nil {
return deployment.Params{}, err
}
_, _ = configHash.Write(configSecret.Data[SettingsFilename])
// add the checksum to an annotation for the deployment and its pods (the important bit is that the pod template
// changes, which will trigger a rolling update)
kibanaPodSpec.Annotations[configHashAnnotationName] = fmt.Sprint(configHash.Sum32())
// add additional annotations related to the StackConfigPolicy
kibanaPodSpec.Annotations = maps.Merge(kibanaPodSpec.Annotations, policyAnnotations)
// decide the strategy type
strategyType, err := d.getStrategyType(kb)
if err != nil {
return deployment.Params{}, err
}
return deployment.Params{
Name: kbv1.KBNamer.Suffix(kb.Name),
Namespace: kb.Namespace,
Replicas: kb.Spec.Count,
Selector: kb.GetIdentityLabels(),
Labels: kb.GetIdentityLabels(),
PodTemplateSpec: kibanaPodSpec,
RevisionHistoryLimit: kb.Spec.RevisionHistoryLimit,
Strategy: appsv1.DeploymentStrategy{Type: strategyType},
}, nil
}
func (d *driver) buildVolumes(kb *kbv1.Kibana) ([]commonvolume.VolumeLike, error) {
volumes := []commonvolume.VolumeLike{DataVolume, initcontainer.ConfigSharedVolume, initcontainer.ConfigVolume(*kb)}
esAssocConf, err := kb.EsAssociation().AssociationConf()
if err != nil {
return nil, err
}
if esAssocConf.CAIsConfigured() {
esCertsVolume := esCaCertSecretVolume(*esAssocConf)
volumes = append(volumes, esCertsVolume)
}
entAssocConf, err := kb.EntAssociation().AssociationConf()
if err != nil {
return nil, err
}
if entAssocConf.CAIsConfigured() {
entCertsVolume := entCaCertSecretVolume(*entAssocConf)
volumes = append(volumes, entCertsVolume)
}
if kb.Spec.HTTP.TLS.Enabled() {
httpCertsVolume := certificates.HTTPCertSecretVolume(kbv1.KBNamer, kb.Name)
volumes = append(volumes, httpCertsVolume)
}
return volumes, nil
}
func NewService(kb kbv1.Kibana) *corev1.Service {
svc := corev1.Service{
ObjectMeta: kb.Spec.HTTP.Service.ObjectMeta,
Spec: kb.Spec.HTTP.Service.Spec,
}
svc.ObjectMeta.Namespace = kb.Namespace
svc.ObjectMeta.Name = kbv1.HTTPService(kb.Name)
labels := kb.GetIdentityLabels()
ports := []corev1.ServicePort{
{
Name: kb.Spec.HTTP.Protocol(),
Protocol: corev1.ProtocolTCP,
Port: network.HTTPPort,
},
}
return defaults.SetServiceDefaults(&svc, labels, labels, ports)
}