in internal/stats_manager/stats_manager.go [193:280]
func (sc *StatsCollector) statsDumper() {
defer sc.workerDone.Done()
err := createPipe(common.TransferPipe)
if err != nil {
log.Err("stats_manager::statsDumper : [%v]", err)
disableMonitoring()
return
}
f, err := os.OpenFile(common.TransferPipe, os.O_CREATE|os.O_WRONLY, 0777)
if err != nil {
log.Err("stats_manager::statsDumper : unable to open pipe file [%v]", err)
disableMonitoring()
return
}
defer f.Close()
log.Info("stats_manager::statsDumper : opened transfer pipe file")
for st := range sc.channel {
// log.Debug("stats_manager::statsDumper : stats: %v", st)
idx := sc.compIdx
if st.IsEvent {
event := st.CompMsg.(Events)
pipeMsg := PipeMsg{
Timestamp: event.Timestamp,
ComponentName: stMgrOpt.statsList[idx].ComponentName,
Operation: event.Operation,
Path: event.Path,
Value: event.Value,
}
msg, err := json.Marshal(pipeMsg)
if err != nil {
log.Err("stats_manager::statsDumper : Unable to marshal [%v]", err)
continue
}
// log.Debug("stats_manager::statsDumper : stats: %v", string(msg))
stMgrOpt.transferMtx.Lock()
_, err = f.WriteString(fmt.Sprintf("%v\n", string(msg)))
stMgrOpt.transferMtx.Unlock()
if err != nil {
log.Err("stats_manager::statsDumper : Unable to write to pipe [%v]", err)
disableMonitoring()
break
}
} else {
// accumulate component level stats
stat := st.CompMsg.(Stats)
// TODO: check if this lock can be removed
stMgrOpt.statsMtx.Lock()
_, isPresent := stMgrOpt.statsList[idx].Value[stat.Key]
if !isPresent {
stMgrOpt.statsList[idx].Value[stat.Key] = (int64)(0)
}
switch stat.Operation {
case Increment:
stMgrOpt.statsList[idx].Value[stat.Key] = stMgrOpt.statsList[idx].Value[stat.Key].(int64) + stat.Value.(int64)
case Decrement:
stMgrOpt.statsList[idx].Value[stat.Key] = stMgrOpt.statsList[idx].Value[stat.Key].(int64) - stat.Value.(int64)
if stMgrOpt.statsList[idx].Value[stat.Key].(int64) < 0 {
log.Err("stats_manager::statsDumper : Negative value %v after decrement of %v for component %v",
stMgrOpt.statsList[idx].Value[stat.Key], stat.Key, stMgrOpt.statsList[idx].ComponentName)
}
case Replace:
stMgrOpt.statsList[idx].Value[stat.Key] = stat.Value
default:
log.Debug("stats_manager::statsDumper : Incorrect operation for stats collection")
stMgrOpt.statsMtx.Unlock()
continue
}
stMgrOpt.statsList[idx].Timestamp = stat.Timestamp
stMgrOpt.statsMtx.Unlock()
}
}
}