kubernetes/metadata/metadata.go (143 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package metadata
import (
"context"
"fmt"
"strings"
"gopkg.in/yaml.v2"
"github.com/elastic/elastic-agent-autodiscover/kubernetes"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/safemapstr"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sclient "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
// MetaGen allows creation of metadata from either Kubernetes resources or their Resource names.
type MetaGen interface {
// Generate generates metadata for a given resource.
// Metadata map is formed in the following format:
// {
// "kubernetes": GenerateK8s(),
// "some.ecs.field": "asdf, // populated by GenerateECS()
// }
// This method is called in top level and returns the complete map of metadata.
Generate(kubernetes.Resource, ...FieldOptions) mapstr.M
// GenerateFromName generates metadata for a given resource based on it's name
GenerateFromName(string, ...FieldOptions) mapstr.M
// GenerateK8s generates kubernetes metadata for a given resource
GenerateK8s(kubernetes.Resource, ...FieldOptions) mapstr.M
// GenerateECS generates ECS metadata for a given resource
GenerateECS(kubernetes.Resource) mapstr.M
}
// FieldOptions allows additional enrichment to be done on top of existing metadata
type FieldOptions func(mapstr.M)
type ClusterInfo struct {
URL string
Name string
}
type ClusterConfiguration struct {
ControlPlaneEndpoint string `yaml:"controlPlaneEndpoint"`
ClusterName string `yaml:"clusterName"`
}
// WithFields FieldOption allows adding specific fields into the generated metadata
func WithFields(key string, value interface{}) FieldOptions {
return func(meta mapstr.M) {
_ = safemapstr.Put(meta, key, value)
}
}
// WithMetadata FieldOption allows adding labels and annotations under sub-resource(kind)
// example if kind=namespace namespace.labels key will be added
func WithMetadata(kind string) FieldOptions {
return func(meta mapstr.M) {
if meta["labels"] != nil {
_ = safemapstr.Put(meta, strings.ToLower(kind)+".labels", meta["labels"])
}
if meta["annotations"] != nil {
_ = safemapstr.Put(meta, strings.ToLower(kind)+".annotations", meta["annotations"])
}
}
}
// GetPodMetaGen is a wrapper function that creates a metaGen for pod resource and has embeeded
// nodeMetaGen and namespaceMetaGen
func GetPodMetaGen(
cfg *config.C,
podWatcher kubernetes.Watcher,
nodeWatcher kubernetes.Watcher,
namespaceWatcher kubernetes.Watcher,
replicasetWatcher kubernetes.Watcher,
jobWatcher kubernetes.Watcher,
metaConf *AddResourceMetadataConfig) MetaGen {
var nodeMetaGen, namespaceMetaGen, rsMetaGen, jobMetaGen MetaGen
if nodeWatcher != nil && metaConf.Node.Enabled() {
nodeMetaGen = NewNodeMetadataGenerator(metaConf.Node, nodeWatcher.Store(), nodeWatcher.Client())
}
if namespaceWatcher != nil && metaConf.Namespace.Enabled() {
namespaceMetaGen = NewNamespaceMetadataGenerator(metaConf.Namespace, namespaceWatcher.Store(), namespaceWatcher.Client())
}
if replicasetWatcher != nil && metaConf.Deployment {
rsMetaGen = NewReplicasetMetadataGenerator(cfg, replicasetWatcher.Store(), replicasetWatcher.Client())
}
if jobWatcher != nil && metaConf.CronJob {
jobMetaGen = NewJobMetadataGenerator(cfg, jobWatcher.Store(), jobWatcher.Client())
}
metaGen := NewPodMetadataGenerator(
cfg,
podWatcher.Store(),
podWatcher.Client(),
nodeMetaGen,
namespaceMetaGen,
rsMetaGen,
jobMetaGen,
metaConf)
return metaGen
}
// GetKubernetesClusterIdentifier returns ClusterInfo for k8s if available
func GetKubernetesClusterIdentifier(cfg *config.C, client k8sclient.Interface) (ClusterInfo, error) {
// try with kube config file
var c Config
err := c.Unmarshal(cfg)
if err != nil {
return ClusterInfo{}, err
}
clusterInfo, err := getClusterInfoFromKubeConfigFile(c.KubeConfig)
if err == nil {
return clusterInfo, nil
}
// try with kubeadm-config configmap only if config_kubeAdm == true
clusterInfo, err = getClusterInfoFromKubeadmConfigMap(client, c.KubeAdm)
if err == nil {
return clusterInfo, nil
}
return ClusterInfo{}, fmt.Errorf("unable to retrieve cluster identifiers")
}
func getClusterInfoFromKubeadmConfigMap(client k8sclient.Interface, kubeadm bool) (ClusterInfo, error) {
clusterInfo := ClusterInfo{}
if client == nil {
return clusterInfo, fmt.Errorf("unable to get cluster identifiers from kubeadm-config")
}
if !kubeadm {
return clusterInfo, nil
}
cm, err := client.CoreV1().ConfigMaps("kube-system").Get(context.TODO(), "kubeadm-config", metav1.GetOptions{})
if err != nil {
return clusterInfo, fmt.Errorf("unable to get cluster identifiers from kubeadm-config: %w", err)
}
p, ok := cm.Data["ClusterConfiguration"]
if !ok {
return clusterInfo, fmt.Errorf("unable to get cluster identifiers from ClusterConfiguration")
}
cc := &ClusterConfiguration{}
err = yaml.Unmarshal([]byte(p), cc)
if err != nil {
return ClusterInfo{}, err
}
if cc.ClusterName != "" {
clusterInfo.Name = cc.ClusterName
}
if cc.ControlPlaneEndpoint != "" {
clusterInfo.URL = cc.ControlPlaneEndpoint
}
return clusterInfo, nil
}
func getClusterInfoFromKubeConfigFile(kubeconfig string) (ClusterInfo, error) {
if kubeconfig == "" {
kubeconfig = kubernetes.GetKubeConfigEnvironmentVariable()
}
if kubeconfig == "" {
return ClusterInfo{}, fmt.Errorf("unable to get cluster identifiers from kube_config from env")
}
cfg, err := kubernetes.BuildConfig(kubeconfig)
if err != nil {
return ClusterInfo{}, fmt.Errorf("unable to build kube config due to error: %w", err)
}
kubeCfg, err := clientcmd.LoadFromFile(kubeconfig)
if err != nil {
return ClusterInfo{}, fmt.Errorf("unable to load kube_config due to error: %w", err)
}
for key, element := range kubeCfg.Clusters {
if element.Server == cfg.Host {
return ClusterInfo{element.Server, key}, nil
}
}
return ClusterInfo{}, fmt.Errorf("unable to get cluster identifiers from kube_config")
}