pkg/controller/apmserver/pod.go (153 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package apmserver import ( "fmt" "path/filepath" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/util/intstr" apmv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/apm/v1" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/certificates" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/container" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/defaults" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/keystore" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/version" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/volume" "github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s" ) const ( // HTTPPort is the (default) port used by ApmServer HTTPPort = DefaultHTTPPort SecretTokenKey string = "secret-token" DataVolumePath = ApmBaseDir + "/data" ConfigVolumePath = ApmBaseDir + "/config" ) var ( DefaultMemoryLimits = resource.MustParse("512Mi") DefaultResources = corev1.ResourceRequirements{ Requests: map[corev1.ResourceName]resource.Quantity{ corev1.ResourceMemory: DefaultMemoryLimits, }, Limits: map[corev1.ResourceName]resource.Quantity{ corev1.ResourceMemory: DefaultMemoryLimits, }, } ) // readinessProbe is the readiness probe for the APM Server container func readinessProbe(tls bool) corev1.Probe { scheme := corev1.URISchemeHTTP if tls { scheme = corev1.URISchemeHTTPS } return corev1.Probe{ FailureThreshold: 3, InitialDelaySeconds: 10, PeriodSeconds: 10, SuccessThreshold: 1, TimeoutSeconds: 5, ProbeHandler: corev1.ProbeHandler{ HTTPGet: &corev1.HTTPGetAction{ Port: intstr.FromInt(HTTPPort), Path: "/", Scheme: scheme, }, }, } } var args = []string{ // -e flag is implicit in containerised versions of APM server as they start the binary with the --environment=container flag. "-c", "config/config-secret/apm-server.yml", } const ( dataVolumeName = "apmserver-data" configVolumeName = "config-volume" ) var ( configVolume = volume.NewEmptyDirVolume(configVolumeName, ConfigVolumePath) // dataVolume is used to propagatee the keystore to the APM server from the keystore init container // and to hold metadata written by APM server (at least since 9.0) to avoid writing into the containers filesystem. // Given that APM server is stateless we should be OK with an emptyDir volume. dataVolume = volume.NewEmptyDirVolume(dataVolumeName, DataVolumePath) ) type PodSpecParams struct { Version string CustomImageName string PodTemplate corev1.PodTemplateSpec TokenSecret corev1.Secret ConfigSecret corev1.Secret keystoreResources *keystore.Resources } func newPodSpec(c k8s.Client, as *apmv1.ApmServer, p PodSpecParams) (corev1.PodTemplateSpec, error) { labels := as.GetIdentityLabels() labels[APMVersionLabelName] = p.Version // ensure the Pod gets rotated on config change configHash, err := buildConfigHash(c, as, p) if err != nil { return corev1.PodTemplateSpec{}, err } annotations := map[string]string{configHashAnnotationName: configHash} configSecretVolume := volume.NewSecretVolumeWithMountPath( p.ConfigSecret.Name, "config", filepath.Join(ConfigVolumePath, "config-secret"), ) env := defaults.ExtendPodDownwardEnvVars(corev1.EnvVar{ Name: "SECRET_TOKEN", ValueFrom: &corev1.EnvVarSource{ SecretKeyRef: &corev1.SecretKeySelector{ LocalObjectReference: corev1.LocalObjectReference{Name: p.TokenSecret.Name}, Key: SecretTokenKey, }, }, }) ports := getDefaultContainerPorts(*as) volumes := []corev1.Volume{configVolume.Volume(), configSecretVolume.Volume(), dataVolume.Volume()} volumeMounts := []corev1.VolumeMount{configVolume.VolumeMount(), configSecretVolume.VolumeMount(), dataVolume.VolumeMount()} var initContainers []corev1.Container if p.keystoreResources != nil { volumes = append(volumes, p.keystoreResources.Volume) initContainers = append(initContainers, p.keystoreResources.InitContainer) } v, err := version.Parse(p.Version) if err != nil { return corev1.PodTemplateSpec{}, err // error unlikely and should have been caught during validation } builder := defaults.NewPodTemplateBuilder(p.PodTemplate, apmv1.ApmServerContainerName). WithLabels(labels). WithAnnotations(annotations). WithResources(DefaultResources). WithDockerImage(p.CustomImageName, container.ImageRepository(container.APMServerImage, v)). WithReadinessProbe(readinessProbe(as.Spec.HTTP.TLS.Enabled())). WithPorts(ports). WithArgs(args...). WithEnv(env...). WithVolumes(volumes...). WithVolumeMounts(volumeMounts...). WithInitContainers(initContainers...) builder, err = withAssociationCACertsVolumes(builder, *as) if err != nil { return corev1.PodTemplateSpec{}, err } builder = withHTTPCertsVolume(builder, *as) return builder.WithInitContainerDefaults().PodTemplate, nil } func getDefaultContainerPorts(as apmv1.ApmServer) []corev1.ContainerPort { return []corev1.ContainerPort{{Name: as.Spec.HTTP.Protocol(), ContainerPort: int32(HTTPPort), Protocol: corev1.ProtocolTCP}} } func withHTTPCertsVolume(builder *defaults.PodTemplateBuilder, as apmv1.ApmServer) *defaults.PodTemplateBuilder { if !as.Spec.HTTP.TLS.Enabled() { return builder } vol := certificates.HTTPCertSecretVolume(Namer, as.Name) return builder.WithVolumes(vol.Volume()).WithVolumeMounts(vol.VolumeMount()) } func withAssociationCACertsVolumes(builder *defaults.PodTemplateBuilder, as apmv1.ApmServer) (*defaults.PodTemplateBuilder, error) { for _, association := range as.GetAssociations() { assocConf, err := association.AssociationConf() if err != nil { return nil, err } if !assocConf.CAIsConfigured() { continue } vol := volume.NewSecretVolumeWithMountPath( assocConf.GetCASecretName(), fmt.Sprintf("%s-certs", association.AssociationType()), filepath.Join(ApmBaseDir, certificatesDir(association.AssociationType())), ) builder.WithVolumes(vol.Volume()).WithVolumeMounts(vol.VolumeMount()) } return builder, nil }