func run()

in cmd/alertmanager/main.go [135:563]


func run() int {
	if os.Getenv("DEBUG") != "" {
		runtime.SetBlockProfileRate(20)
		runtime.SetMutexProfileFraction(20)
	}

	var (
		configFile          = kingpin.Flag("config.file", "Alertmanager configuration file name.").Default("alertmanager.yml").String()
		dataDir             = kingpin.Flag("storage.path", "Base path for data storage.").Default("data/").String()
		retention           = kingpin.Flag("data.retention", "How long to keep data for.").Default("120h").Duration()
		maintenanceInterval = kingpin.Flag("data.maintenance-interval", "Interval between garbage collection and snapshotting to disk of the silences and the notification logs.").Default("15m").Duration()
		alertGCInterval     = kingpin.Flag("alerts.gc-interval", "Interval between alert GC.").Default("30m").Duration()

		webConfig      = webflag.AddFlags(kingpin.CommandLine, ":9093")
		externalURL    = kingpin.Flag("web.external-url", "The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Alertmanager. If omitted, relevant URL components will be derived automatically.").String()
		routePrefix    = kingpin.Flag("web.route-prefix", "Prefix for the internal routes of web endpoints. Defaults to path of --web.external-url.").String()
		getConcurrency = kingpin.Flag("web.get-concurrency", "Maximum number of GET requests processed concurrently. If negative or zero, the limit is GOMAXPROC or 8, whichever is larger.").Default("0").Int()
		httpTimeout    = kingpin.Flag("web.timeout", "Timeout for HTTP requests. If negative or zero, no timeout is set.").Default("0").Duration()

		clusterBindAddr = kingpin.Flag("cluster.listen-address", "Listen address for cluster. Set to empty string to disable HA mode.").
				Default(defaultClusterAddr).String()
		clusterAdvertiseAddr   = kingpin.Flag("cluster.advertise-address", "Explicit address to advertise in cluster.").String()
		peers                  = kingpin.Flag("cluster.peer", "Initial peers (may be repeated).").Strings()
		peerTimeout            = kingpin.Flag("cluster.peer-timeout", "Time to wait between peers to send notifications.").Default("15s").Duration()
		gossipInterval         = kingpin.Flag("cluster.gossip-interval", "Interval between sending gossip messages. By lowering this value (more frequent) gossip messages are propagated across the cluster more quickly at the expense of increased bandwidth.").Default(cluster.DefaultGossipInterval.String()).Duration()
		pushPullInterval       = kingpin.Flag("cluster.pushpull-interval", "Interval for gossip state syncs. Setting this interval lower (more frequent) will increase convergence speeds across larger clusters at the expense of increased bandwidth usage.").Default(cluster.DefaultPushPullInterval.String()).Duration()
		tcpTimeout             = kingpin.Flag("cluster.tcp-timeout", "Timeout for establishing a stream connection with a remote node for a full state sync, and for stream read and write operations.").Default(cluster.DefaultTCPTimeout.String()).Duration()
		probeTimeout           = kingpin.Flag("cluster.probe-timeout", "Timeout to wait for an ack from a probed node before assuming it is unhealthy. This should be set to 99-percentile of RTT (round-trip time) on your network.").Default(cluster.DefaultProbeTimeout.String()).Duration()
		probeInterval          = kingpin.Flag("cluster.probe-interval", "Interval between random node probes. Setting this lower (more frequent) will cause the cluster to detect failed nodes more quickly at the expense of increased bandwidth usage.").Default(cluster.DefaultProbeInterval.String()).Duration()
		settleTimeout          = kingpin.Flag("cluster.settle-timeout", "Maximum time to wait for cluster connections to settle before evaluating notifications.").Default(cluster.DefaultPushPullInterval.String()).Duration()
		reconnectInterval      = kingpin.Flag("cluster.reconnect-interval", "Interval between attempting to reconnect to lost peers.").Default(cluster.DefaultReconnectInterval.String()).Duration()
		peerReconnectTimeout   = kingpin.Flag("cluster.reconnect-timeout", "Length of time to attempt to reconnect to a lost peer.").Default(cluster.DefaultReconnectTimeout.String()).Duration()
		tlsConfigFile          = kingpin.Flag("cluster.tls-config", "[EXPERIMENTAL] Path to config yaml file that can enable mutual TLS within the gossip protocol.").Default("").String()
		allowInsecureAdvertise = kingpin.Flag("cluster.allow-insecure-public-advertise-address-discovery", "[EXPERIMENTAL] Allow alertmanager to discover and listen on a public IP address.").Bool()
		label                  = kingpin.Flag("cluster.label", "The cluster label is an optional string to include on each packet and stream. It uniquely identifies the cluster and prevents cross-communication issues when sending gossip messages.").Default("").String()
		featureFlags           = kingpin.Flag("enable-feature", fmt.Sprintf("Experimental features to enable. The flag can be repeated to enable multiple features. Valid options: %s", strings.Join(featurecontrol.AllowedFlags, ", "))).Default("").String()
	)

	promlogflag.AddFlags(kingpin.CommandLine, &promlogConfig)
	kingpin.CommandLine.UsageWriter(os.Stdout)

	kingpin.Version(version.Print("alertmanager"))
	kingpin.CommandLine.GetFlag("help").Short('h')
	// Read any other flags from EXTRA_ARGS.
	logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
	if extraArgs, err := exportsetup.ExtraArgs(); err != nil {
		level.Error(logger).Log("msg", "Error parsing commandline arguments", "err", err)
		kingpin.CommandLine.Usage(os.Args[1:])
		os.Exit(2)
	} else if _, err := kingpin.CommandLine.Parse(append(os.Args[1:], extraArgs...)); err != nil {
		level.Error(logger).Log("msg", "Error parsing commandline arguments", "err", err)
		kingpin.CommandLine.Usage(os.Args[1:])
		os.Exit(2)
	}

	logger = promlog.New(&promlogConfig)

	level.Info(logger).Log("msg", "Starting Alertmanager", "version", version.Info())
	level.Info(logger).Log("build_context", version.BuildContext())

	ff, err := featurecontrol.NewFlags(logger, *featureFlags)
	if err != nil {
		level.Error(logger).Log("msg", "error parsing the feature flag list", "err", err)
		return 1
	}
	compat.InitFromFlags(logger, ff)

	err = os.MkdirAll(*dataDir, 0o777)
	if err != nil {
		level.Error(logger).Log("msg", "Unable to create data directory", "err", err)
		return 1
	}

	tlsTransportConfig, err := cluster.GetTLSTransportConfig(*tlsConfigFile)
	if err != nil {
		level.Error(logger).Log("msg", "unable to initialize TLS transport configuration for gossip mesh", "err", err)
		return 1
	}
	var peer *cluster.Peer
	if *clusterBindAddr != "" {
		peer, err = cluster.Create(
			log.With(logger, "component", "cluster"),
			prometheus.DefaultRegisterer,
			*clusterBindAddr,
			*clusterAdvertiseAddr,
			*peers,
			true,
			*pushPullInterval,
			*gossipInterval,
			*tcpTimeout,
			*probeTimeout,
			*probeInterval,
			tlsTransportConfig,
			*allowInsecureAdvertise,
			*label,
		)
		if err != nil {
			level.Error(logger).Log("msg", "unable to initialize gossip mesh", "err", err)
			return 1
		}
		clusterEnabled.Set(1)
	}

	stopc := make(chan struct{})
	var wg sync.WaitGroup

	notificationLogOpts := nflog.Options{
		SnapshotFile: filepath.Join(*dataDir, "nflog"),
		Retention:    *retention,
		Logger:       log.With(logger, "component", "nflog"),
		Metrics:      prometheus.DefaultRegisterer,
	}

	notificationLog, err := nflog.New(notificationLogOpts)
	if err != nil {
		level.Error(logger).Log("err", err)
		return 1
	}
	if peer != nil {
		c := peer.AddState("nfl", notificationLog, prometheus.DefaultRegisterer)
		notificationLog.SetBroadcast(c.Broadcast)
	}

	wg.Add(1)
	go func() {
		notificationLog.Maintenance(*maintenanceInterval, filepath.Join(*dataDir, "nflog"), stopc, nil)
		wg.Done()
	}()

	marker := types.NewMarker(prometheus.DefaultRegisterer)

	silenceOpts := silence.Options{
		SnapshotFile: filepath.Join(*dataDir, "silences"),
		Retention:    *retention,
		Logger:       log.With(logger, "component", "silences"),
		Metrics:      prometheus.DefaultRegisterer,
	}

	silences, err := silence.New(silenceOpts)
	if err != nil {
		level.Error(logger).Log("err", err)
		return 1
	}
	if peer != nil {
		c := peer.AddState("sil", silences, prometheus.DefaultRegisterer)
		silences.SetBroadcast(c.Broadcast)
	}

	// Start providers before router potentially sends updates.
	wg.Add(1)
	go func() {
		silences.Maintenance(*maintenanceInterval, filepath.Join(*dataDir, "silences"), stopc, nil)
		wg.Done()
	}()

	defer func() {
		close(stopc)
		wg.Wait()
	}()

	// Peer state listeners have been registered, now we can join and get the initial state.
	if peer != nil {
		err = peer.Join(
			*reconnectInterval,
			*peerReconnectTimeout,
		)
		if err != nil {
			level.Warn(logger).Log("msg", "unable to join gossip mesh", "err", err)
		}
		ctx, cancel := context.WithTimeout(context.Background(), *settleTimeout)
		defer func() {
			cancel()
			if err := peer.Leave(10 * time.Second); err != nil {
				level.Warn(logger).Log("msg", "unable to leave gossip mesh", "err", err)
			}
		}()
		go peer.Settle(ctx, *gossipInterval*10)
	}

	alerts, err := mem.NewAlerts(context.Background(), marker, *alertGCInterval, nil, logger, prometheus.DefaultRegisterer)
	if err != nil {
		level.Error(logger).Log("err", err)
		return 1
	}
	defer alerts.Close()

	var disp *dispatch.Dispatcher
	defer func() {
		disp.Stop()
	}()

	groupFn := func(routeFilter func(*dispatch.Route) bool, alertFilter func(*types.Alert, time.Time) bool) (dispatch.AlertGroups, map[model.Fingerprint][]string) {
		return disp.Groups(routeFilter, alertFilter)
	}

	// An interface value that holds a nil concrete value is non-nil.
	// Therefore we explicly pass an empty interface, to detect if the
	// cluster is not enabled in notify.
	var clusterPeer cluster.ClusterPeer
	if peer != nil {
		clusterPeer = peer
	}

	api, err := api.New(api.Options{
		Alerts:      alerts,
		Silences:    silences,
		StatusFunc:  marker.Status,
		Peer:        clusterPeer,
		Timeout:     *httpTimeout,
		Concurrency: *getConcurrency,
		Logger:      log.With(logger, "component", "api"),
		Registry:    prometheus.DefaultRegisterer,
		GroupFunc:   groupFn,
	})
	if err != nil {
		level.Error(logger).Log("err", fmt.Errorf("failed to create API: %w", err))
		return 1
	}

	amURL, err := extURL(logger, os.Hostname, (*webConfig.WebListenAddresses)[0], *externalURL)
	if err != nil {
		level.Error(logger).Log("msg", "failed to determine external URL", "err", err)
		return 1
	}
	level.Debug(logger).Log("externalURL", amURL.String())

	waitFunc := func() time.Duration { return 0 }
	if peer != nil {
		waitFunc = clusterWait(peer, *peerTimeout)
	}
	timeoutFunc := func(d time.Duration) time.Duration {
		if d < notify.MinTimeout {
			d = notify.MinTimeout
		}
		return d + waitFunc()
	}

	var (
		inhibitor *inhibit.Inhibitor
		tmpl      *template.Template
	)

	dispMetrics := dispatch.NewDispatcherMetrics(false, prometheus.DefaultRegisterer)
	pipelineBuilder := notify.NewPipelineBuilder(prometheus.DefaultRegisterer, ff)
	configLogger := log.With(logger, "component", "configuration")
	configCoordinator := config.NewCoordinator(
		*configFile,
		prometheus.DefaultRegisterer,
		configLogger,
	)
	configCoordinator.Subscribe(func(conf *config.Config) error {
		tmpl, err = template.FromGlobs(conf.Templates)
		if err != nil {
			return fmt.Errorf("failed to parse templates: %w", err)
		}
		if externalURL := conf.GoogleCloud.ExternalURL; externalURL != nil {
			tmpl.ExternalURL = externalURL.URL
		} else {
			tmpl.ExternalURL = amURL
		}

		// Build the routing tree and record which receivers are used.
		routes := dispatch.NewRoute(conf.Route, nil)
		activeReceivers := make(map[string]struct{})
		routes.Walk(func(r *dispatch.Route) {
			activeReceivers[r.RouteOpts.Receiver] = struct{}{}
		})

		// Build the map of receiver to integrations.
		receivers := make(map[string][]notify.Integration, len(activeReceivers))
		var integrationsNum int
		for _, rcv := range conf.Receivers {
			if _, found := activeReceivers[rcv.Name]; !found {
				// No need to build a receiver if no route is using it.
				level.Info(configLogger).Log("msg", "skipping creation of receiver not referenced by any route", "receiver", rcv.Name)
				continue
			}
			integrations, err := receiver.BuildReceiverIntegrations(rcv, tmpl, logger)
			if err != nil {
				return err
			}
			// rcv.Name is guaranteed to be unique across all receivers.
			receivers[rcv.Name] = integrations
			integrationsNum += len(integrations)
		}

		// Build the map of time interval names to time interval definitions.
		timeIntervals := make(map[string][]timeinterval.TimeInterval, len(conf.MuteTimeIntervals)+len(conf.TimeIntervals))
		for _, ti := range conf.MuteTimeIntervals {
			timeIntervals[ti.Name] = ti.TimeIntervals
		}

		for _, ti := range conf.TimeIntervals {
			timeIntervals[ti.Name] = ti.TimeIntervals
		}

		intervener := timeinterval.NewIntervener(timeIntervals)

		inhibitor.Stop()
		disp.Stop()

		inhibitor = inhibit.NewInhibitor(alerts, conf.InhibitRules, marker, logger)
		silencer := silence.NewSilencer(silences, marker, logger)

		// An interface value that holds a nil concrete value is non-nil.
		// Therefore we explicly pass an empty interface, to detect if the
		// cluster is not enabled in notify.
		var pipelinePeer notify.Peer
		if peer != nil {
			pipelinePeer = peer
		}

		pipeline := pipelineBuilder.New(
			receivers,
			waitFunc,
			inhibitor,
			silencer,
			intervener,
			notificationLog,
			pipelinePeer,
		)

		configuredReceivers.Set(float64(len(activeReceivers)))
		configuredIntegrations.Set(float64(integrationsNum))
		configuredInhibitionRules.Set(float64(len(conf.InhibitRules)))

		api.Update(conf, func(labels model.LabelSet) {
			inhibitor.Mutes(labels)
			silencer.Mutes(labels)
		})

		disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, nil, logger, dispMetrics)
		routes.Walk(func(r *dispatch.Route) {
			if r.RouteOpts.RepeatInterval > *retention {
				level.Warn(configLogger).Log(
					"msg",
					"repeat_interval is greater than the data retention period. It can lead to notifications being repeated more often than expected.",
					"repeat_interval",
					r.RouteOpts.RepeatInterval,
					"retention",
					*retention,
					"route",
					r.Key(),
				)
			}

			if r.RouteOpts.RepeatInterval < r.RouteOpts.GroupInterval {
				level.Warn(configLogger).Log(
					"msg",
					"repeat_interval is less than group_interval. Notifications will not repeat until the next group_interval.",
					"repeat_interval",
					r.RouteOpts.RepeatInterval,
					"group_interval",
					r.RouteOpts.GroupInterval,
					"route",
					r.Key(),
				)
			}
		})

		go disp.Run()
		go inhibitor.Run()

		return nil
	})

	if err := configCoordinator.Reload(); err != nil {
		return 1
	}

	// Make routePrefix default to externalURL path if empty string.
	if *routePrefix == "" {
		*routePrefix = amURL.Path
	}
	*routePrefix = "/" + strings.Trim(*routePrefix, "/")
	level.Debug(logger).Log("routePrefix", *routePrefix)

	router := route.New().WithInstrumentation(instrumentHandler)
	if *routePrefix != "/" {
		router.Get("/", func(w http.ResponseWriter, r *http.Request) {
			http.Redirect(w, r, *routePrefix, http.StatusFound)
		})
		router = router.WithPrefix(*routePrefix)
	}

	webReload := make(chan chan error)

	ui.Register(router, webReload, logger)
	reactapp.Register(router, logger)

	mux := api.Register(router, *routePrefix)

	srv := &http.Server{Handler: mux}
	srvc := make(chan struct{})

	go func() {
		if err := web.ListenAndServe(srv, webConfig, logger); !errors.Is(err, http.ErrServerClosed) {
			level.Error(logger).Log("msg", "Listen error", "err", err)
			close(srvc)
		}
		defer func() {
			if err := srv.Close(); err != nil {
				level.Error(logger).Log("msg", "Error on closing the server", "err", err)
			}
		}()
	}()

	var (
		hup  = make(chan os.Signal, 1)
		term = make(chan os.Signal, 1)
	)
	signal.Notify(hup, syscall.SIGHUP)
	signal.Notify(term, os.Interrupt, syscall.SIGTERM)

	for {
		select {
		case <-hup:
			// ignore error, already logged in `reload()`
			_ = configCoordinator.Reload()
		case errc := <-webReload:
			errc <- configCoordinator.Reload()
		case <-term:
			level.Info(logger).Log("msg", "Received SIGTERM, exiting gracefully...")
			return 0
		case <-srvc:
			return 1
		}
	}
}