func Start()

in traffic_monitor/manager/manager.go [44:188]


func Start(opsConfigFile string, cfg config.Config, appData config.StaticAppData, trafficMonitorConfigFileName string) error {
	toSession := towrap.NewTrafficOpsSessionThreadsafe(nil, nil, cfg.CRConfigHistoryCount, cfg)

	localStates := peer.NewCRStatesThreadsafe() // this is the local state as discoverer by this traffic_monitor
	fetchCount := threadsafe.NewUint()          // note this is the number of individual caches fetched from, not the number of times all the caches were polled.
	healthIteration := threadsafe.NewUint()
	errorCount := threadsafe.NewUint()

	toData := todata.NewThreadsafe()

	// makes results chan
	cacheHealthHandler := cache.NewHandler()
	// passes results chan to poller
	cacheHealthPoller := poller.NewCache(true, cacheHealthHandler, cfg, appData)
	cacheStatHandler := cache.NewPrecomputeHandler(toData)
	cacheStatPoller := poller.NewCache(false, cacheStatHandler, cfg, appData)
	monitorConfigPoller := poller.NewMonitorConfig(cfg.MonitorConfigPollingInterval)
	peerHandler := peer.NewHandler()
	peerPoller := poller.NewPeer(peerHandler, cfg, appData)
	distributedPeerHandler := peer.NewHandler()
	distributedPeerPoller := poller.NewPeer(distributedPeerHandler, cfg, appData)

	go monitorConfigPoller.Poll()
	go cacheHealthPoller.Poll()
	if cfg.StatPolling {
		go cacheStatPoller.Poll()
	}
	go peerPoller.Poll()
	if cfg.DistributedPolling {
		go distributedPeerPoller.Poll()
	}

	events := health.NewThreadsafeEvents(cfg.MaxEvents)

	var cachesChangedForStatMgr chan struct{}
	var cachesChangedForHealthMgr chan struct{}
	var cachesChanged chan struct{}
	if cfg.StatPolling {
		cachesChangedForStatMgr = make(chan struct{})
		cachesChanged = cachesChangedForStatMgr
	} else {
		cachesChangedForHealthMgr = make(chan struct{})
		cachesChanged = cachesChangedForHealthMgr
	}
	peerStates := peer.NewCRStatesPeersThreadsafe(cfg.PeerOptimisticQuorumMin) // each peer's last state is saved in this map
	distributedPeerStates := peer.NewCRStatesPeersThreadsafe(0)

	monitorConfig := StartMonitorConfigManager(
		monitorConfigPoller.ConfigChannel,
		localStates,
		peerStates,
		distributedPeerStates,
		cacheStatPoller.ConfigChannel,
		cacheHealthPoller.ConfigChannel,
		peerPoller.ConfigChannel,
		distributedPeerPoller.ConfigChannel,
		monitorConfigPoller.IntervalChan,
		cachesChanged,
		cfg,
		appData,
		toSession,
		toData,
	)

	combinedStates, combineStateFunc := StartStateCombiner(events, peerStates, localStates, toData)

	StartPeerManager(
		peerHandler.ResultChannel,
		peerStates,
		events,
		combineStateFunc,
	)

	statInfoHistory, statResultHistory, statMaxKbpses, _, lastKbpsStats, dsStats, statUnpolledCaches, localCacheStatus := StartStatHistoryManager(
		cacheStatHandler.ResultChan(),
		localStates,
		combinedStates,
		toData,
		cachesChangedForStatMgr,
		cfg,
		monitorConfig,
		events,
		combineStateFunc,
	)

	lastHealthDurations, healthHistory, healthUnpolledCaches := StartHealthResultManager(
		cacheHealthHandler.ResultChan(),
		toData,
		localStates,
		monitorConfig,
		fetchCount,
		cfg,
		events,
		localCacheStatus,
		cachesChangedForHealthMgr,
		combineStateFunc,
	)

	StartDistributedPeerManager(
		distributedPeerHandler.ResultChannel,
		localStates,
		distributedPeerStates,
		events,
		healthUnpolledCaches,
	)

	if _, err := StartOpsConfigManager(
		opsConfigFile,
		toSession,
		toData,
		[]chan<- handler.OpsConfig{monitorConfigPoller.OpsConfigChannel},
		[]chan<- towrap.TrafficOpsSessionThreadsafe{monitorConfigPoller.SessionChannel},
		localStates,
		peerStates,
		distributedPeerStates,
		combinedStates,
		statInfoHistory,
		statResultHistory,
		statMaxKbpses,
		healthHistory,
		lastKbpsStats,
		dsStats,
		events,
		appData,
		cacheHealthPoller.Config.Interval,
		lastHealthDurations,
		fetchCount,
		healthIteration,
		errorCount,
		localCacheStatus,
		statUnpolledCaches,
		healthUnpolledCaches,
		monitorConfig,
		cfg,
	); err != nil {
		return fmt.Errorf("starting ops config manager: %v", err)
	}

	if err := startMonitorConfigFilePoller(trafficMonitorConfigFileName); err != nil {
		return fmt.Errorf("starting monitor config file poller: %v", err)
	}

	healthTickListener(cacheHealthPoller.TickChan, healthIteration)
	return nil
}