func statsPolling()

in internal/stats_manager/stats_manager.go [282:370]


func statsPolling() {
	// create polling pipe
	err := createPipe(common.PollingPipe)
	if err != nil {
		log.Err("stats_manager::statsPolling : [%v]", err)
		disableMonitoring()
		return
	}

	// open polling pipe
	pf, err := os.OpenFile(common.PollingPipe, os.O_RDONLY, os.ModeNamedPipe)
	if err != nil {
		fmt.Printf("stats_manager::statsPolling : unable to open pipe file [%v]", err)
		return
	}
	defer pf.Close()

	log.Info("stats_manager::statsPolling : opened polling pipe file")

	reader := bufio.NewReader(pf)

	// create transfer pipe
	err = createPipe(common.TransferPipe)
	if err != nil {
		log.Err("stats_manager::statsPolling : [%v]", err)
		disableMonitoring()
		return
	}

	// open transfer pipe
	tf, err := os.OpenFile(common.TransferPipe, os.O_CREATE|os.O_WRONLY, 0777)
	if err != nil {
		log.Err("stats_manager::statsPolling : unable to open pipe file [%v]", err)
		disableMonitoring()
		return
	}
	defer tf.Close()

	log.Info("stats_manager::statsPolling : opened transfer pipe file")

	for {
		// read the polling message sent by stats monitor
		line, err := reader.ReadBytes('\n')
		if err != nil {
			log.Err("stats_manager::statsPolling : Unable to read from pipe [%v]", err)
			disableMonitoring()
			break
		}

		// validating poll message
		if !strings.Contains(string(line), "Poll at") {
			continue
		}

		// TODO: check if this lock can be removed
		stMgrOpt.statsMtx.Lock()
		for _, cmpSt := range stMgrOpt.statsList {
			if len(cmpSt.Value) == 0 {
				continue
			}

			if cmpSt.Timestamp == stMgrOpt.cmpTimeMap[cmpSt.ComponentName] {
				log.Debug("stats_manager::statsPolling : Skipping as there is no change in stats collected for %v", cmpSt.ComponentName)
				continue
			}

			msg, err := json.Marshal(cmpSt)
			if err != nil {
				log.Err("stats_manager::statsPolling : Unable to marshal [%v]", err)
				continue
			}

			// log.Debug("stats_manager::statsPolling : stats: %v", string(msg))

			// send the stats collected so far to transfer pipe
			stMgrOpt.transferMtx.Lock()
			_, err = tf.WriteString(fmt.Sprintf("%v\n", string(msg)))
			stMgrOpt.transferMtx.Unlock()
			if err != nil {
				log.Err("stats_manager::statsDumper : Unable to write to pipe [%v]", err)
				disableMonitoring()
				break
			}

			stMgrOpt.cmpTimeMap[cmpSt.ComponentName] = cmpSt.Timestamp
		}
		stMgrOpt.statsMtx.Unlock()
	}
}