in prometheus-to-sd/main.go [96:204]
func main() {
flag.Set("logtostderr", "true")
flag.Var(&source, "source", "source(s) to watch in [component-name]:[http|https]://host:port/path?whitelisted=a,b,c&podIdLabel=d&namespaceIdLabel=e&containerNameLabel=f&metricsPrefix=prefix&authToken=token&authUsername=user&authPassword=password format. Can be specified multiple times")
flag.Var(&dynamicSources, "dynamic-source",
`dynamic source(s) to watch in format: "[component-name]:[http|https]://:port/path?whitelisted=metric1,metric2&podIdLabel=label1&namespaceIdLabel=label2&containerNameLabel=label3&metricsPrefix=prefix&authToken=token&authUsername=user&authPassword=password". Dynamic sources are components (on the same node) discovered dynamically using the kubernetes api.`,
)
defer glog.Flush()
flag.Parse()
if *delayedShutdownTimeout < 0 {
signal.Ignore(syscall.SIGTERM)
} else {
sigTermChannel := make(chan os.Signal, 1)
signal.Notify(sigTermChannel, syscall.SIGTERM)
go func() {
<-sigTermChannel
glog.Infof("SIGTERM has been received, Waiting %s before the shutdown.", delayedShutdownTimeout.String())
time.Sleep(*delayedShutdownTimeout)
glog.Info("Shutting down after receiving SIGTERM.")
os.Exit(0)
}()
}
gceConf, err := config.GetGceConfig(*projectOverride, *clusterNameOverride, *clusterLocationOverride, *zoneOverride, *nodeOverride)
if err != nil {
glog.Fatalf("Failed to get GCE config: %v", err)
}
glog.Infof("GCE config: %+v", gceConf)
sourceConfigs := getSourceConfigs(*metricsPrefix, gceConf)
glog.Infof("Built the following source configs: %v", sourceConfigs)
monitoredResourceLabels := parseMonitoredResourceLabels(*monitoredResourceLabels)
if len(monitoredResourceLabels) > 0 {
if *monitoredResourceTypePrefix == "" {
glog.Fatalf("When 'monitored-resource-labels' is specified, 'monitored-resource-type-prefix' cannot be empty.")
}
glog.Infof("Monitored resource labels: %v", monitoredResourceLabels)
}
go func() {
http.Handle("/metrics", promhttp.Handler())
glog.Error(http.ListenAndServe(fmt.Sprintf("%s:%d", *listenAddress, *metricsPort), nil))
}()
go func() {
glog.Error(http.ListenAndServe(fmt.Sprintf("%s:%d", *debugAddress, *debugPort), expvar.Handler()))
}()
var options []option.ClientOption
if *gceTokenURL != "" {
ts := config.NewAltTokenSource(*gceTokenURL, *gceTokenBody)
options = append(options, option.WithTokenSource(ts))
}
uDomain := ""
if *apioverride != "" {
glog.Infof("Stackdriver API endpoint is overridden to %s", *apioverride)
// option.WithEndpoint() only works with "host:port"
endpoint, err := getEndpoint(*apioverride)
if err != nil {
glog.Fatalf("Error parsing --api-override: %s, with error: %v", *apioverride, err)
}
options = append(options, option.WithEndpoint(endpoint))
// infer universe domain from Stackdriver api endpoint
_, domain, found := strings.Cut(*apioverride, ".")
if found {
uDomain = strings.TrimSuffix(domain, "/")
}
}
// override inferred universe domain with specified value
if *universeDomain != "" {
uDomain = *universeDomain
}
if uDomain != "" {
glog.Infof("Universe domain is %s", uDomain)
options = append(options, option.WithUniverseDomain(uDomain))
}
ctx := context.Background()
client, err := monitoring.NewMetricClient(ctx, options...)
if err != nil {
glog.Fatalf("Failed to create client: %v", err)
}
glog.V(4).Infof("Successfully created gcm client")
if len(sourceConfigs) == 0 {
glog.Fatalf("No sources defined. Please specify at least one --source flag.")
}
if *scrapeInterval > *exportInterval {
glog.Fatalf("--scrape-interval cannot be bigger than --export-interval")
}
for _, sourceConfig := range sourceConfigs {
glog.V(4).Infof("Starting goroutine for %+v", sourceConfig)
// Pass sourceConfig as a parameter to avoid using the last sourceConfig by all goroutines.
go readAndPushDataToStackdriver(ctx, client, gceConf, sourceConfig, monitoredResourceLabels, *monitoredResourceTypePrefix)
}
// As worker goroutines work forever, block main thread as well.
<-make(chan int)
}