otelcollector/configuration-reader-builder/main.go (261 lines of code) (raw):
package main
import (
"fmt"
"io/fs"
"log"
"net/http"
"os/exec"
"strings"
"time"
"os"
configmapsettings "github.com/prometheus-collector/shared/configmap/mp"
"github.com/prometheus/common/model"
yaml "gopkg.in/yaml.v2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type PrometheusCRConfig struct {
Enabled bool `yaml:"enabled,omitempty"`
AllowNamespaces []string `yaml:"allow_namespaces,omitempty"`
DenyNamespaces []string `yaml:"deny_namespaces,omitempty"`
PodMonitorSelector *metav1.LabelSelector `yaml:"pod_monitor_selector,omitempty"`
PodMonitorNamespaceSelector *metav1.LabelSelector `yaml:"pod_monitor_namespace_selector,omitempty"`
ServiceMonitorSelector *metav1.LabelSelector `yaml:"service_monitor_selector,omitempty"`
ServiceMonitorNamespaceSelector *metav1.LabelSelector `yaml:"service_monitor_namespace_selector,omitempty"`
ScrapeConfigSelector *metav1.LabelSelector `yaml:"scrape_config_selector,omitempty"`
ScrapeConfigNamespaceSelector *metav1.LabelSelector `yaml:"scrape_config_namespace_selector,omitempty"`
ProbeSelector *metav1.LabelSelector `yaml:"probe_selector,omitempty"`
ProbeNamespaceSelector *metav1.LabelSelector `yaml:"probe_namespace_selector,omitempty"`
ScrapeInterval model.Duration `yaml:"scrape_interval,omitempty"`
}
type Config struct {
CollectorSelector *metav1.LabelSelector `yaml:"collector_selector,omitempty"`
Config map[string]interface{} `yaml:"config"`
AllocationStrategy string `yaml:"allocation_strategy,omitempty"`
PrometheusCR PrometheusCRConfig `yaml:"prometheus_cr,omitempty"`
FilterStrategy string `yaml:"filter_strategy,omitempty"`
}
type OtelConfig struct {
Exporters interface{} `yaml:"exporters"`
Processors interface{} `yaml:"processors"`
Extensions interface{} `yaml:"extensions"`
Receivers struct {
Prometheus struct {
Config map[string]interface{} `yaml:"config"`
TargetAllocator interface{} `yaml:"target_allocator"`
} `yaml:"prometheus"`
} `yaml:"receivers"`
Service struct {
Extensions interface{} `yaml:"extensions"`
Pipelines struct {
Metrics struct {
Exporters interface{} `yaml:"exporters"`
Processors interface{} `yaml:"processors"`
Receivers interface{} `yaml:"receivers"`
} `yaml:"metrics"`
MetricsTelemetry struct {
Exporters interface{} `yaml:"exporters,omitempty"`
Processors interface{} `yaml:"processors,omitempty"`
Receivers interface{} `yaml:"receivers,omitempty"`
} `yaml:"metrics/telemetry,omitempty"`
} `yaml:"pipelines"`
Telemetry struct {
Logs struct {
Level interface{} `yaml:"level"`
Encoding interface{} `yaml:"encoding"`
} `yaml:"logs"`
} `yaml:"telemetry"`
} `yaml:"service"`
}
var RESET = "\033[0m"
var RED = "\033[31m"
var taConfigFilePath = "/ta-configuration/targetallocator.yaml"
var taConfigUpdated = false
var taLivenessCounter = 0
var taLivenessStartTime = time.Time{}
func logFatalError(message string) {
// Always log the full message
log.Fatalf("%s%s%s", RED, message, RESET)
}
func updateTAConfigFile(configFilePath string) {
defaultsMergedConfigFileContents, err := os.ReadFile(configFilePath)
if err != nil {
logFatalError(fmt.Sprintf("config-reader::Unable to read file contents from: %s - %v\n", configFilePath, err))
os.Exit(1)
}
var promScrapeConfig map[string]interface{}
var otelConfig OtelConfig
err = yaml.Unmarshal([]byte(defaultsMergedConfigFileContents), &otelConfig)
if err != nil {
logFatalError(fmt.Sprintf("config-reader::Unable to unmarshal merged otel configuration from: %s - %v\n", configFilePath, err))
os.Exit(1)
}
promScrapeConfig = otelConfig.Receivers.Prometheus.Config
// Removing $$ added for regex and replacement in relabel_config and metric_relabel_config added by promconfigvalidator.
// The $$ are required by the validator's otel get method, but the TA doesnt do env substitution and hence needs to be removed, else TA crashes.
scrapeConfigs := promScrapeConfig["scrape_configs"]
if scrapeConfigs != nil {
var sc = scrapeConfigs.([]interface{})
for _, scrapeConfig := range sc {
scrapeConfig := scrapeConfig.(map[interface{}]interface{})
if scrapeConfig["relabel_configs"] != nil {
relabelConfigs := scrapeConfig["relabel_configs"].([]interface{})
for _, relabelConfig := range relabelConfigs {
relabelConfig := relabelConfig.(map[interface{}]interface{})
//replace $$ with $ for regex field
if relabelConfig["regex"] != nil {
// Adding this check here since regex can be boolean and the conversion will fail
if _, isString := relabelConfig["regex"].(string); isString {
regexString := relabelConfig["regex"].(string)
modifiedRegexString := strings.ReplaceAll(regexString, "$$", "$")
relabelConfig["regex"] = modifiedRegexString
}
}
//replace $$ with $ for replacement field
if relabelConfig["replacement"] != nil {
replacement := relabelConfig["replacement"].(string)
modifiedReplacementString := strings.ReplaceAll(replacement, "$$", "$")
relabelConfig["replacement"] = modifiedReplacementString
}
}
}
if scrapeConfig["metric_relabel_configs"] != nil {
metricRelabelConfigs := scrapeConfig["metric_relabel_configs"].([]interface{})
for _, metricRelabelConfig := range metricRelabelConfigs {
metricRelabelConfig := metricRelabelConfig.(map[interface{}]interface{})
//replace $$ with $ for regex field
if metricRelabelConfig["regex"] != nil {
// Adding this check here since regex can be boolean and the conversion will fail
if _, isString := metricRelabelConfig["regex"].(string); isString {
regexString := metricRelabelConfig["regex"].(string)
modifiedRegexString := strings.ReplaceAll(regexString, "$$", "$")
metricRelabelConfig["regex"] = modifiedRegexString
}
}
//replace $$ with $ for replacement field
if metricRelabelConfig["replacement"] != nil {
replacement := metricRelabelConfig["replacement"].(string)
modifiedReplacementString := strings.ReplaceAll(replacement, "$$", "$")
metricRelabelConfig["replacement"] = modifiedReplacementString
}
}
}
}
}
targetAllocatorConfig := Config{
AllocationStrategy: "consistent-hashing",
FilterStrategy: "relabel-config",
CollectorSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"rsName": "ama-metrics",
"kubernetes.azure.com/managedby": "aks",
},
},
Config: promScrapeConfig,
PrometheusCR: PrometheusCRConfig{
ServiceMonitorSelector: &metav1.LabelSelector{},
PodMonitorSelector: &metav1.LabelSelector{},
},
}
targetAllocatorConfigYaml, _ := yaml.Marshal(targetAllocatorConfig)
if err := os.WriteFile(taConfigFilePath, targetAllocatorConfigYaml, 0644); err != nil {
logFatalError(fmt.Sprintf("config-reader::Unable to write to: %s - %v\n", taConfigFilePath, err))
os.Exit(1)
}
log.Println("Updated file - targetallocator.yaml for the TargetAllocator to pick up new config changes")
taConfigUpdated = true
taLivenessStartTime = time.Now()
}
func hasConfigChanged(filePath string) bool {
if _, err := os.Stat(filePath); err == nil {
fileInfo, err := os.Stat(filePath)
if err != nil {
fmt.Println("Error getting file info:", err)
os.Exit(1)
}
return fileInfo.Size() > 0
}
return false
}
func taHealthHandler(w http.ResponseWriter, r *http.Request) {
status := http.StatusOK
message := "\ntargetallocator is running."
client := &http.Client{Timeout: time.Duration(2) * time.Second}
req, err := http.NewRequest("GET", "http://localhost:8080/metrics", nil)
if err == nil {
resp, _ := client.Do(req)
if resp != nil && resp.StatusCode == http.StatusOK {
if taConfigUpdated {
if !taLivenessStartTime.IsZero() {
duration := time.Since(taLivenessStartTime)
// Serve the response of ServiceUnavailable for 60s and then reset
if duration.Seconds() < 60 {
status = http.StatusServiceUnavailable
message += "targetallocator-config changed"
} else {
taConfigUpdated = false
taLivenessStartTime = time.Time{}
}
}
}
if status != http.StatusOK {
fmt.Printf(message)
}
w.WriteHeader(status)
fmt.Fprintln(w, message)
}
if resp != nil && resp.Body != nil {
defer resp.Body.Close()
}
} else {
message = "\ncall to get TA metrics failed"
status = http.StatusServiceUnavailable
fmt.Printf(message)
w.WriteHeader(status)
fmt.Fprintln(w, message)
}
}
func writeTerminationLog(message string) {
if err := os.WriteFile("/dev/termination-log", []byte(message), fs.FileMode(0644)); err != nil {
log.Printf("Error writing to termination log: %v", err)
}
}
func healthHandler(w http.ResponseWriter, r *http.Request) {
status := http.StatusOK
message := "\nconfig-reader is running."
if hasConfigChanged("/opt/inotifyoutput.txt") {
status = http.StatusServiceUnavailable
message += "\ninotifyoutput.txt has been updated - config-reader-config changed"
}
w.WriteHeader(status)
fmt.Fprintln(w, message)
if status != http.StatusOK {
fmt.Printf(message)
writeTerminationLog(message)
}
}
func main() {
_, err := os.Create("/opt/inotifyoutput.txt")
if err != nil {
log.Fatalf("Error creating output file: %v\n", err)
fmt.Println("Error creating inotify output file:", err)
}
// Define the command to start inotify for config reader's liveness probe
inotifyCommandCfg := exec.Command(
"inotifywait",
"/etc/config/settings",
"--daemon",
"--recursive",
"--outfile", "/opt/inotifyoutput.txt",
"--event", "create",
"--event", "delete",
"--format", "%e : %T",
"--timefmt", "+%s",
)
// Start the inotify process
err = inotifyCommandCfg.Start()
if err != nil {
log.Fatalf("Error starting inotify process for config reader's liveness probe: %v\n", err)
fmt.Println("Error starting inotify process:", err)
}
configmapsettings.Configmapparser()
if os.Getenv("AZMON_USE_DEFAULT_PROMETHEUS_CONFIG") == "true" {
if _, err = os.Stat("/opt/microsoft/otelcollector/collector-config-default.yml"); err == nil {
updateTAConfigFile("/opt/microsoft/otelcollector/collector-config-default.yml")
}
} else if _, err = os.Stat("/opt/microsoft/otelcollector/collector-config.yml"); err == nil {
updateTAConfigFile("/opt/microsoft/otelcollector/collector-config.yml")
} else {
log.Println("No configs found via configmap, not running config reader")
}
http.HandleFunc("/health", healthHandler)
http.HandleFunc("/health-ta", taHealthHandler)
http.ListenAndServe(":8081", nil)
}