func()

in internal/stats_manager/stats_manager.go [193:280]


func (sc *StatsCollector) statsDumper() {
	defer sc.workerDone.Done()

	err := createPipe(common.TransferPipe)
	if err != nil {
		log.Err("stats_manager::statsDumper : [%v]", err)
		disableMonitoring()
		return
	}

	f, err := os.OpenFile(common.TransferPipe, os.O_CREATE|os.O_WRONLY, 0777)
	if err != nil {
		log.Err("stats_manager::statsDumper : unable to open pipe file [%v]", err)
		disableMonitoring()
		return
	}
	defer f.Close()

	log.Info("stats_manager::statsDumper : opened transfer pipe file")

	for st := range sc.channel {
		// log.Debug("stats_manager::statsDumper : stats: %v", st)

		idx := sc.compIdx
		if st.IsEvent {
			event := st.CompMsg.(Events)
			pipeMsg := PipeMsg{
				Timestamp:     event.Timestamp,
				ComponentName: stMgrOpt.statsList[idx].ComponentName,
				Operation:     event.Operation,
				Path:          event.Path,
				Value:         event.Value,
			}

			msg, err := json.Marshal(pipeMsg)
			if err != nil {
				log.Err("stats_manager::statsDumper : Unable to marshal [%v]", err)
				continue
			}

			// log.Debug("stats_manager::statsDumper : stats: %v", string(msg))

			stMgrOpt.transferMtx.Lock()
			_, err = f.WriteString(fmt.Sprintf("%v\n", string(msg)))
			stMgrOpt.transferMtx.Unlock()
			if err != nil {
				log.Err("stats_manager::statsDumper : Unable to write to pipe [%v]", err)
				disableMonitoring()
				break
			}

		} else {
			// accumulate component level stats
			stat := st.CompMsg.(Stats)

			// TODO: check if this lock can be removed
			stMgrOpt.statsMtx.Lock()

			_, isPresent := stMgrOpt.statsList[idx].Value[stat.Key]
			if !isPresent {
				stMgrOpt.statsList[idx].Value[stat.Key] = (int64)(0)
			}

			switch stat.Operation {
			case Increment:
				stMgrOpt.statsList[idx].Value[stat.Key] = stMgrOpt.statsList[idx].Value[stat.Key].(int64) + stat.Value.(int64)

			case Decrement:
				stMgrOpt.statsList[idx].Value[stat.Key] = stMgrOpt.statsList[idx].Value[stat.Key].(int64) - stat.Value.(int64)
				if stMgrOpt.statsList[idx].Value[stat.Key].(int64) < 0 {
					log.Err("stats_manager::statsDumper : Negative value %v after decrement of %v for component %v",
						stMgrOpt.statsList[idx].Value[stat.Key], stat.Key, stMgrOpt.statsList[idx].ComponentName)
				}

			case Replace:
				stMgrOpt.statsList[idx].Value[stat.Key] = stat.Value

			default:
				log.Debug("stats_manager::statsDumper : Incorrect operation for stats collection")
				stMgrOpt.statsMtx.Unlock()
				continue
			}
			stMgrOpt.statsList[idx].Timestamp = stat.Timestamp

			stMgrOpt.statsMtx.Unlock()
		}
	}
}