func()

in collector/service.go [361:466]


func (s *Service) Open(ctx context.Context) error {
	ctx, s.cancel = context.WithCancel(ctx)

	if err := s.store.Open(ctx); err != nil {
		return fmt.Errorf("failed to open wal store: %w", err)
	}

	if err := s.metricsSvc.Open(ctx); err != nil {
		return fmt.Errorf("failed to open metrics service: %w", err)
	}

	for _, workerSvc := range s.workerSvcs {
		if err := workerSvc.Open(ctx); err != nil {
			return fmt.Errorf("failed to open worker service: %w", err)
		}
	}

	if err := s.replicator.Open(ctx); err != nil {
		return err
	}

	if err := s.batcher.Open(ctx); err != nil {
		return err
	}

	if s.scraper != nil {
		if err := s.scraper.Open(ctx); err != nil {
			return err
		}
	}

	listenerFunc := plaintextListenerFunc()
	if s.opts.TLSCertFile != "" && s.opts.TLSKeyFile != "" {
		logger.Infof("TLS enabled for listeners")
		cert, err := tls.LoadX509KeyPair(s.opts.TLSCertFile, s.opts.TLSKeyFile)
		if err != nil {
			return fmt.Errorf("failed to load cert and key: %w", err)
		}
		listenerFunc = tlsListenerFunc(cert)
	}

	s.httpServers = []*http.HttpServer{}

	listener, err := listenerFunc(s.opts.ListenAddr)
	if err != nil {
		return err
	}

	opts := &http.ServerOpts{
		MaxConns:     s.opts.MaxConnections,
		WriteTimeout: 30 * time.Second,
		Listener:     listener,
	}

	primaryHttp := http.NewServer(opts)

	primaryHttp.RegisterHandler("/metrics", promhttp.Handler())
	if s.opts.EnablePprof {
		opts.WriteTimeout = 60 * time.Second
		primaryHttp.RegisterHandlerFunc("/debug/pprof/", pprof.Index)
		primaryHttp.RegisterHandlerFunc("/debug/pprof/cmdline", pprof.Cmdline)
		primaryHttp.RegisterHandlerFunc("/debug/pprof/profile", pprof.Profile)
		primaryHttp.RegisterHandlerFunc("/debug/pprof/symbol", pprof.Symbol)
		primaryHttp.RegisterHandlerFunc("/debug/pprof/trace", pprof.Trace)
	}

	for _, handler := range s.httpHandlers {
		primaryHttp.RegisterHandlerFunc(handler.Path, handler.Handler)
	}
	s.httpServers = append(s.httpServers, primaryHttp)

	for _, handler := range s.grpcHandlers {
		listener, err := listenerFunc(fmt.Sprintf(":%d", handler.Port))
		if err != nil {
			return err
		}
		server := http.NewServer(&http.ServerOpts{
			MaxConns: s.opts.MaxConnections,
			Listener: listener,
		})
		server.RegisterHandler(handler.Path, handler.Handler)
		s.httpServers = append(s.httpServers, server)
	}

	for _, httpServer := range s.httpServers {
		if err := httpServer.Open(ctx); err != nil {
			return err
		}
		logger.Infof("Started %s", httpServer)
	}

	go func() {
		ticker := time.NewTicker(time.Minute)
		defer ticker.Stop()
		for {
			select {
			case <-ctx.Done():
				return
			case <-ticker.C:
				metrics.CollectorHealthCheck.WithLabelValues(s.opts.Region).Set(1)
			}
		}
	}()

	return nil
}