in traffic_monitor/manager/manager.go [44:188]
func Start(opsConfigFile string, cfg config.Config, appData config.StaticAppData, trafficMonitorConfigFileName string) error {
toSession := towrap.NewTrafficOpsSessionThreadsafe(nil, nil, cfg.CRConfigHistoryCount, cfg)
localStates := peer.NewCRStatesThreadsafe() // this is the local state as discoverer by this traffic_monitor
fetchCount := threadsafe.NewUint() // note this is the number of individual caches fetched from, not the number of times all the caches were polled.
healthIteration := threadsafe.NewUint()
errorCount := threadsafe.NewUint()
toData := todata.NewThreadsafe()
// makes results chan
cacheHealthHandler := cache.NewHandler()
// passes results chan to poller
cacheHealthPoller := poller.NewCache(true, cacheHealthHandler, cfg, appData)
cacheStatHandler := cache.NewPrecomputeHandler(toData)
cacheStatPoller := poller.NewCache(false, cacheStatHandler, cfg, appData)
monitorConfigPoller := poller.NewMonitorConfig(cfg.MonitorConfigPollingInterval)
peerHandler := peer.NewHandler()
peerPoller := poller.NewPeer(peerHandler, cfg, appData)
distributedPeerHandler := peer.NewHandler()
distributedPeerPoller := poller.NewPeer(distributedPeerHandler, cfg, appData)
go monitorConfigPoller.Poll()
go cacheHealthPoller.Poll()
if cfg.StatPolling {
go cacheStatPoller.Poll()
}
go peerPoller.Poll()
if cfg.DistributedPolling {
go distributedPeerPoller.Poll()
}
events := health.NewThreadsafeEvents(cfg.MaxEvents)
var cachesChangedForStatMgr chan struct{}
var cachesChangedForHealthMgr chan struct{}
var cachesChanged chan struct{}
if cfg.StatPolling {
cachesChangedForStatMgr = make(chan struct{})
cachesChanged = cachesChangedForStatMgr
} else {
cachesChangedForHealthMgr = make(chan struct{})
cachesChanged = cachesChangedForHealthMgr
}
peerStates := peer.NewCRStatesPeersThreadsafe(cfg.PeerOptimisticQuorumMin) // each peer's last state is saved in this map
distributedPeerStates := peer.NewCRStatesPeersThreadsafe(0)
monitorConfig := StartMonitorConfigManager(
monitorConfigPoller.ConfigChannel,
localStates,
peerStates,
distributedPeerStates,
cacheStatPoller.ConfigChannel,
cacheHealthPoller.ConfigChannel,
peerPoller.ConfigChannel,
distributedPeerPoller.ConfigChannel,
monitorConfigPoller.IntervalChan,
cachesChanged,
cfg,
appData,
toSession,
toData,
)
combinedStates, combineStateFunc := StartStateCombiner(events, peerStates, localStates, toData)
StartPeerManager(
peerHandler.ResultChannel,
peerStates,
events,
combineStateFunc,
)
statInfoHistory, statResultHistory, statMaxKbpses, _, lastKbpsStats, dsStats, statUnpolledCaches, localCacheStatus := StartStatHistoryManager(
cacheStatHandler.ResultChan(),
localStates,
combinedStates,
toData,
cachesChangedForStatMgr,
cfg,
monitorConfig,
events,
combineStateFunc,
)
lastHealthDurations, healthHistory, healthUnpolledCaches := StartHealthResultManager(
cacheHealthHandler.ResultChan(),
toData,
localStates,
monitorConfig,
fetchCount,
cfg,
events,
localCacheStatus,
cachesChangedForHealthMgr,
combineStateFunc,
)
StartDistributedPeerManager(
distributedPeerHandler.ResultChannel,
localStates,
distributedPeerStates,
events,
healthUnpolledCaches,
)
if _, err := StartOpsConfigManager(
opsConfigFile,
toSession,
toData,
[]chan<- handler.OpsConfig{monitorConfigPoller.OpsConfigChannel},
[]chan<- towrap.TrafficOpsSessionThreadsafe{monitorConfigPoller.SessionChannel},
localStates,
peerStates,
distributedPeerStates,
combinedStates,
statInfoHistory,
statResultHistory,
statMaxKbpses,
healthHistory,
lastKbpsStats,
dsStats,
events,
appData,
cacheHealthPoller.Config.Interval,
lastHealthDurations,
fetchCount,
healthIteration,
errorCount,
localCacheStatus,
statUnpolledCaches,
healthUnpolledCaches,
monitorConfig,
cfg,
); err != nil {
return fmt.Errorf("starting ops config manager: %v", err)
}
if err := startMonitorConfigFilePoller(trafficMonitorConfigFileName); err != nil {
return fmt.Errorf("starting monitor config file poller: %v", err)
}
healthTickListener(cacheHealthPoller.TickChan, healthIteration)
return nil
}