in collector/service.go [361:466]
func (s *Service) Open(ctx context.Context) error {
ctx, s.cancel = context.WithCancel(ctx)
if err := s.store.Open(ctx); err != nil {
return fmt.Errorf("failed to open wal store: %w", err)
}
if err := s.metricsSvc.Open(ctx); err != nil {
return fmt.Errorf("failed to open metrics service: %w", err)
}
for _, workerSvc := range s.workerSvcs {
if err := workerSvc.Open(ctx); err != nil {
return fmt.Errorf("failed to open worker service: %w", err)
}
}
if err := s.replicator.Open(ctx); err != nil {
return err
}
if err := s.batcher.Open(ctx); err != nil {
return err
}
if s.scraper != nil {
if err := s.scraper.Open(ctx); err != nil {
return err
}
}
listenerFunc := plaintextListenerFunc()
if s.opts.TLSCertFile != "" && s.opts.TLSKeyFile != "" {
logger.Infof("TLS enabled for listeners")
cert, err := tls.LoadX509KeyPair(s.opts.TLSCertFile, s.opts.TLSKeyFile)
if err != nil {
return fmt.Errorf("failed to load cert and key: %w", err)
}
listenerFunc = tlsListenerFunc(cert)
}
s.httpServers = []*http.HttpServer{}
listener, err := listenerFunc(s.opts.ListenAddr)
if err != nil {
return err
}
opts := &http.ServerOpts{
MaxConns: s.opts.MaxConnections,
WriteTimeout: 30 * time.Second,
Listener: listener,
}
primaryHttp := http.NewServer(opts)
primaryHttp.RegisterHandler("/metrics", promhttp.Handler())
if s.opts.EnablePprof {
opts.WriteTimeout = 60 * time.Second
primaryHttp.RegisterHandlerFunc("/debug/pprof/", pprof.Index)
primaryHttp.RegisterHandlerFunc("/debug/pprof/cmdline", pprof.Cmdline)
primaryHttp.RegisterHandlerFunc("/debug/pprof/profile", pprof.Profile)
primaryHttp.RegisterHandlerFunc("/debug/pprof/symbol", pprof.Symbol)
primaryHttp.RegisterHandlerFunc("/debug/pprof/trace", pprof.Trace)
}
for _, handler := range s.httpHandlers {
primaryHttp.RegisterHandlerFunc(handler.Path, handler.Handler)
}
s.httpServers = append(s.httpServers, primaryHttp)
for _, handler := range s.grpcHandlers {
listener, err := listenerFunc(fmt.Sprintf(":%d", handler.Port))
if err != nil {
return err
}
server := http.NewServer(&http.ServerOpts{
MaxConns: s.opts.MaxConnections,
Listener: listener,
})
server.RegisterHandler(handler.Path, handler.Handler)
s.httpServers = append(s.httpServers, server)
}
for _, httpServer := range s.httpServers {
if err := httpServer.Open(ctx); err != nil {
return err
}
logger.Infof("Started %s", httpServer)
}
go func() {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
metrics.CollectorHealthCheck.WithLabelValues(s.opts.Region).Set(1)
}
}
}()
return nil
}