in otelcollector/otel-allocator/main.go [39:259]
func main() {
var (
// allocatorPrehook will be nil if filterStrategy is not set or
// unrecognized. No filtering will be used in this case.
allocatorPrehook prehook.Hook
allocator allocation.Allocator
discoveryManager *discovery.Manager
collectorWatcher *collector.Watcher
promWatcher allocatorWatcher.Watcher
targetDiscoverer *target.Discoverer
discoveryCancel context.CancelFunc
runGroup run.Group
eventChan = make(chan allocatorWatcher.Event)
eventCloser = make(chan bool, 1)
interrupts = make(chan os.Signal, 1)
errChan = make(chan error)
)
// EULA statement is required for Arc extension
clusterResourceId := os.Getenv("CLUSTER")
if strings.EqualFold(clusterResourceId, "connectedclusters") {
setupLog.Info("MICROSOFT SOFTWARE LICENSE TERMS\n\nMICROSOFT Azure Arc-enabled Kubernetes\n\nThis software is licensed to you as part of your or your company's subscription license for Microsoft Azure Services. You may only use the software with Microsoft Azure Services and subject to the terms and conditions of the agreement under which you obtained Microsoft Azure Services. If you do not have an active subscription license for Microsoft Azure Services, you may not use the software. Microsoft Azure Legal Information: https://azure.microsoft.com/en-us/support/legal/")
}
cfg, err := config.Load()
if err != nil {
fmt.Printf("Failed to load config: %v", err)
os.Exit(1)
}
ctrl.SetLogger(cfg.RootLogger)
if validationErr := config.ValidateConfig(cfg); validationErr != nil {
setupLog.Error(validationErr, "Invalid configuration")
os.Exit(1)
}
cfg.RootLogger.Info("Starting the Target Allocator")
ctx := context.Background()
log := ctrl.Log.WithName("allocator")
allocatorPrehook = prehook.New(cfg.FilterStrategy, log)
allocator, err = allocation.New(cfg.AllocationStrategy, log, allocation.WithFilter(allocatorPrehook), allocation.WithFallbackStrategy(cfg.AllocationFallbackStrategy))
if err != nil {
setupLog.Error(err, "Unable to initialize allocation strategy")
os.Exit(1)
}
httpOptions := []server.Option{}
if cfg.HTTPS.Enabled {
tlsConfig, confErr := cfg.HTTPS.NewTLSConfig()
if confErr != nil {
setupLog.Error(confErr, "Unable to initialize TLS configuration")
os.Exit(1)
}
httpOptions = append(httpOptions, server.WithTLSConfig(tlsConfig, cfg.HTTPS.ListenAddr))
}
srv := server.NewServer(log, allocator, cfg.ListenAddr, httpOptions...)
discoveryCtx, discoveryCancel := context.WithCancel(ctx)
sdMetrics, err := discovery.CreateAndRegisterSDMetrics(prometheus.DefaultRegisterer)
if err != nil {
setupLog.Error(err, "Unable to register metrics for Prometheus service discovery")
os.Exit(1)
}
discoveryManager = discovery.NewManager(discoveryCtx, gokitlog.NewNopLogger(), prometheus.DefaultRegisterer, sdMetrics)
targetDiscoverer = target.NewDiscoverer(log, discoveryManager, allocatorPrehook, srv, allocator.SetTargets)
collectorWatcher, collectorWatcherErr := collector.NewCollectorWatcher(log, cfg.ClusterConfig, cfg.CollectorNotReadyGracePeriod)
if collectorWatcherErr != nil {
setupLog.Error(collectorWatcherErr, "Unable to initialize collector watcher")
os.Exit(1)
}
signal.Notify(interrupts, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
defer close(interrupts)
if cfg.PrometheusCR.Enabled {
promWatcher, err = allocatorWatcher.NewPrometheusCRWatcher(ctx, setupLog.WithName("prometheus-cr-watcher"), *cfg)
if err != nil {
setupLog.Error(err, "Can't start the prometheus watcher")
os.Exit(1)
}
// apply the initial configuration
promConfig, loadErr := promWatcher.LoadConfig(ctx)
if loadErr != nil {
setupLog.Error(err, "Can't load initial Prometheus configuration from Prometheus CRs")
os.Exit(1)
}
loadErr = targetDiscoverer.ApplyConfig(allocatorWatcher.EventSourcePrometheusCR, promConfig.ScrapeConfigs)
if loadErr != nil {
setupLog.Error(err, "Can't load initial scrape targets from Prometheus CRs")
os.Exit(1)
}
runGroup.Add(
func() error {
promWatcherErr := promWatcher.Watch(eventChan, errChan)
setupLog.Info("Prometheus watcher exited")
return promWatcherErr
},
func(_ error) {
setupLog.Info("Closing prometheus watcher")
promWatcherErr := promWatcher.Close()
if promWatcherErr != nil {
setupLog.Error(promWatcherErr, "prometheus watcher failed to close")
}
})
}
runGroup.Add(
func() error {
discoveryManagerErr := discoveryManager.Run()
setupLog.Info("Discovery manager exited")
return discoveryManagerErr
},
func(_ error) {
setupLog.Info("Closing discovery manager")
discoveryCancel()
})
runGroup.Add(
func() error {
// Initial loading of the config file's scrape config
if cfg.PromConfig != nil && len(cfg.PromConfig.ScrapeConfigs) > 0 {
err = targetDiscoverer.ApplyConfig(allocatorWatcher.EventSourceConfigMap, cfg.PromConfig.ScrapeConfigs)
if err != nil {
setupLog.Error(err, "Unable to apply initial configuration")
return err
}
} else {
setupLog.Info("Prometheus config empty, skipping initial discovery configuration")
}
err := targetDiscoverer.Run()
setupLog.Info("Target discoverer exited")
return err
},
func(_ error) {
setupLog.Info("Closing target discoverer")
targetDiscoverer.Close()
})
runGroup.Add(
func() error {
err := collectorWatcher.Watch(cfg.CollectorNamespace, cfg.CollectorSelector, allocator.SetCollectors)
setupLog.Info("Collector watcher exited")
return err
},
func(_ error) {
setupLog.Info("Closing collector watcher")
collectorWatcher.Close()
})
runGroup.Add(
func() error {
err := srv.Start()
setupLog.Info("Server failed to start")
return err
},
func(_ error) {
setupLog.Info("Closing server")
if shutdownErr := srv.Shutdown(ctx); shutdownErr != nil {
setupLog.Error(shutdownErr, "Error on server shutdown")
}
})
if cfg.HTTPS.Enabled {
runGroup.Add(
func() error {
err := srv.StartHTTPS()
setupLog.Info("HTTPS Server failed to start")
return err
},
func(_ error) {
setupLog.Info("Closing HTTPS server")
if shutdownErr := srv.ShutdownHTTPS(ctx); shutdownErr != nil {
setupLog.Error(shutdownErr, "Error on HTTPS server shutdown")
}
})
}
runGroup.Add(
func() error {
for {
select {
case event := <-eventChan:
eventsMetric.WithLabelValues(event.Source.String()).Inc()
loadConfig, err := event.Watcher.LoadConfig(ctx)
if err != nil {
setupLog.Error(err, "Unable to load configuration")
continue
}
err = targetDiscoverer.ApplyConfig(event.Source, loadConfig.ScrapeConfigs)
if err != nil {
setupLog.Error(err, "Unable to apply configuration")
continue
}
case err := <-errChan:
setupLog.Error(err, "Watcher error")
case <-eventCloser:
return nil
}
}
},
func(_ error) {
setupLog.Info("Closing watcher loop")
close(eventCloser)
})
runGroup.Add(
func() error {
for {
select {
case <-interrupts:
setupLog.Info("Received interrupt")
return nil
case <-eventCloser:
return nil
}
}
},
func(_ error) {
setupLog.Info("Closing interrupt loop")
})
if runErr := runGroup.Run(); runErr != nil {
setupLog.Error(runErr, "run group exited")
}
setupLog.Info("Target allocator exited.")
}