in internal/stats_manager/stats_manager.go [282:370]
func statsPolling() {
// create polling pipe
err := createPipe(common.PollingPipe)
if err != nil {
log.Err("stats_manager::statsPolling : [%v]", err)
disableMonitoring()
return
}
// open polling pipe
pf, err := os.OpenFile(common.PollingPipe, os.O_RDONLY, os.ModeNamedPipe)
if err != nil {
fmt.Printf("stats_manager::statsPolling : unable to open pipe file [%v]", err)
return
}
defer pf.Close()
log.Info("stats_manager::statsPolling : opened polling pipe file")
reader := bufio.NewReader(pf)
// create transfer pipe
err = createPipe(common.TransferPipe)
if err != nil {
log.Err("stats_manager::statsPolling : [%v]", err)
disableMonitoring()
return
}
// open transfer pipe
tf, err := os.OpenFile(common.TransferPipe, os.O_CREATE|os.O_WRONLY, 0777)
if err != nil {
log.Err("stats_manager::statsPolling : unable to open pipe file [%v]", err)
disableMonitoring()
return
}
defer tf.Close()
log.Info("stats_manager::statsPolling : opened transfer pipe file")
for {
// read the polling message sent by stats monitor
line, err := reader.ReadBytes('\n')
if err != nil {
log.Err("stats_manager::statsPolling : Unable to read from pipe [%v]", err)
disableMonitoring()
break
}
// validating poll message
if !strings.Contains(string(line), "Poll at") {
continue
}
// TODO: check if this lock can be removed
stMgrOpt.statsMtx.Lock()
for _, cmpSt := range stMgrOpt.statsList {
if len(cmpSt.Value) == 0 {
continue
}
if cmpSt.Timestamp == stMgrOpt.cmpTimeMap[cmpSt.ComponentName] {
log.Debug("stats_manager::statsPolling : Skipping as there is no change in stats collected for %v", cmpSt.ComponentName)
continue
}
msg, err := json.Marshal(cmpSt)
if err != nil {
log.Err("stats_manager::statsPolling : Unable to marshal [%v]", err)
continue
}
// log.Debug("stats_manager::statsPolling : stats: %v", string(msg))
// send the stats collected so far to transfer pipe
stMgrOpt.transferMtx.Lock()
_, err = tf.WriteString(fmt.Sprintf("%v\n", string(msg)))
stMgrOpt.transferMtx.Unlock()
if err != nil {
log.Err("stats_manager::statsDumper : Unable to write to pipe [%v]", err)
disableMonitoring()
break
}
stMgrOpt.cmpTimeMap[cmpSt.ComponentName] = cmpSt.Timestamp
}
stMgrOpt.statsMtx.Unlock()
}
}