prometheus-to-sd/config/dynamic_source.go (113 lines of code) (raw):

package config import ( "context" "errors" "fmt" "net/url" "sort" "github.com/GoogleCloudPlatform/k8s-stackdriver/prometheus-to-sd/flags" "github.com/golang/glog" core "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ) const ( podNamespace = "kube-system" nameLabel = "k8s-app" ) // SourceConfigsFromDynamicSources takes pod specifications from the Kubernetes API and maps them to source configs. func SourceConfigsFromDynamicSources(gceConfig *GceConfig, sources []flags.Uri) ([]*SourceConfig, error) { if len(sources) == 0 { return nil, nil } sourceMap, err := validateSources(sources) if err != nil { return nil, err } kubeApi, err := createKubernetesApiClient() if err != nil { return nil, err } podResponse, err := kubeApi.CoreV1().Pods(podNamespace).List(context.TODO(), createOptionsForPodSelection(gceConfig.Instance, sourceMap)) if err != nil { return nil, err } return getConfigsFromPods(podResponse.Items, sourceMap), nil } func validateSources(sources flags.Uris) (map[string]url.URL, error) { sourceMap := make(map[string]url.URL) for _, source := range sources { if source.Val.Hostname() != "" { return nil, errors.New("hostname should be empty for all dynamic sources") } if source.Key == "" { return nil, errors.New("component name should NOT be empty for any dynamic source") } if source.Val.Port() == "" { return nil, errors.New("port should NOT be empty for any dynamic source") } sourceMap[source.Key] = source.Val } if len(sourceMap) != len(sources) { return nil, errors.New("source should have unique component names") } return sourceMap, nil } func createKubernetesApiClient() (clientset.Interface, error) { conf, err := rest.InClusterConfig() if err != nil { return nil, err } return clientset.NewForConfig(conf) } func createOptionsForPodSelection(nodeName string, sources map[string]url.URL) v1.ListOptions { var nameList string var keys []string for key := range sources { keys = append(keys, key) } sort.Strings(keys) for _, key := range keys { nameList += key + "," } labelSelector := fmt.Sprintf("%s in (%s)", nameLabel, nameList) return v1.ListOptions{ FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeName), LabelSelector: labelSelector, } } func getConfigsFromPods(pods []core.Pod, sources map[string]url.URL) []*SourceConfig { var sourceConfigs []*SourceConfig for _, pod := range pods { componentName := pod.Labels[nameLabel] source, _ := sources[componentName] sourceConfig, err := mapToSourceConfig(componentName, source, pod.Status.PodIP, pod.Name, pod.Namespace) if err != nil { glog.Warningf("could not create source config for pod %s: %v", pod.Name, err) } sourceConfigs = append(sourceConfigs, sourceConfig) } return sourceConfigs } func mapToSourceConfig(componentName string, url url.URL, ip string, podId, namespaceId string) (*SourceConfig, error) { protocol := url.Scheme port := url.Port() values := url.Query() whitelisted := values.Get("whitelisted") podIdLabel := values.Get("podIdLabel") namespaceIdLabel := values.Get("namespaceIdLabel") containerNamelabel := values.Get("containerNamelabel") metricsPrefix := values.Get("metricsPrefix") customResource := values.Get("customResourceType") customLabels := getMap(values, "customLabels") auth, err := parseAuthConfig(url) if err != nil { return nil, err } podConfig := NewPodConfig(podId, namespaceId, podIdLabel, namespaceIdLabel, containerNamelabel) whitelistedLabelsMap, err := parseWhitelistedLabels(url.Query().Get("whitelistedLabels")) if err != nil { return nil, err } return newSourceConfig(componentName, protocol, ip, port, url.Path, *auth, whitelisted, metricsPrefix, podConfig, whitelistedLabelsMap, customResource, customLabels) }