tools/health-monitor/internal/stats_export.go (219 lines of code) (raw):
/*
_____ _____ _____ ____ ______ _____ ------
| | | | | | | | | | | | |
| | | | | | | | | | | | |
| --- | | | | |-----| |---- | | |-----| |----- ------
| | | | | | | | | | | | |
| ____| |_____ | ____| | ____| | |_____| _____| |_____ |_____
Licensed under the MIT License <http://opensource.org/licenses/MIT>.
Copyright © 2020-2025 Microsoft Corporation. All rights reserved.
Author : <blobfusedev@microsoft.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE
*/
package internal
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"sync/atomic"
"github.com/Azure/azure-storage-fuse/v2/common"
"github.com/Azure/azure-storage-fuse/v2/common/log"
"github.com/Azure/azure-storage-fuse/v2/internal/stats_manager"
hmcommon "github.com/Azure/azure-storage-fuse/v2/tools/health-monitor/common"
)
type ExportedStat struct {
Timestamp string
MonitorName string
Stat interface{}
}
type StatsExporter struct {
channel chan ExportedStat
wg sync.WaitGroup
opFile *os.File
outputList []*Output
}
type Output struct {
Timestamp string `json:"Timestamp,omitempty"`
Bfs []stats_manager.PipeMsg `json:"BlobfuseStats,omitempty"`
FcEvent []*hmcommon.CacheEvent `json:"FileCache,omitempty"`
Cpu string `json:"CPUUsage,omitempty"`
Mem string `json:"MemoryUsage,omitempty"`
Net string `json:"NetworkUsage,omitempty"`
}
var expLock sync.Mutex
var se *StatsExporter
// atomic variable to prevent writing to channel after it has been closed
var pidStatus int32 = 0
// create single instance of StatsExporter
func NewStatsExporter() (*StatsExporter, error) {
if se == nil {
expLock.Lock()
defer expLock.Unlock()
if se == nil {
se = &StatsExporter{}
se.channel = make(chan ExportedStat, 10000)
se.wg.Add(1)
go se.StatsExporter()
err := se.getNewFile()
if err != nil {
log.Err("stats_exporter::NewStatsExporter : [%v]", err)
return nil, err
}
}
}
return se, nil
}
func (se *StatsExporter) Destroy() {
// add 1 to the atomic variable. This will prevent writing to it in AddMonitorStats() method
atomic.AddInt32(&pidStatus, 1)
// write remaining data to the output file
for i, op := range se.outputList {
jsonData, err := json.MarshalIndent(op, "", "\t")
if err != nil {
log.Err("stats_exporter::Destroy : unable to marshal [%v]", err)
}
_, err = se.opFile.Write(jsonData)
if err != nil {
log.Err("stats_exporter::Destroy : unable to write to file [%v]", err)
}
if i != len(se.outputList)-1 {
_, err := se.opFile.WriteString(",\n")
if err != nil {
log.Err("stats_exporter::Destroy : unable to write to file [%v]", err)
}
} else {
_, err := se.opFile.WriteString("\n]")
if err != nil {
log.Err("stats_exporter::Destroy : unable to write to file [%v]", err)
}
}
}
se.opFile.Close()
close(se.channel)
se.wg.Wait()
}
func (se *StatsExporter) AddMonitorStats(monName string, timestamp string, st interface{}) {
// check if the channel is full
if len(se.channel) == cap(se.channel) {
// remove the first element from the channel
<-se.channel
}
if atomic.LoadInt32(&pidStatus) == 0 {
se.channel <- ExportedStat{
Timestamp: timestamp,
MonitorName: monName,
Stat: st,
}
}
}
func (se *StatsExporter) StatsExporter() {
defer se.wg.Done()
for st := range se.channel {
idx := se.checkInList(st.Timestamp)
if idx != -1 {
se.addToList(&st, idx)
} else {
// keep max 3 timestamps in memory
if len(se.outputList) >= 3 {
err := se.addToOutputFile(se.outputList[0])
if err != nil {
log.Err("stats_exporter::StatsExporter : [%v]", err)
}
se.outputList = se.outputList[1:]
}
se.outputList = append(se.outputList, &Output{
Timestamp: st.Timestamp,
})
se.addToList(&st, len(se.outputList)-1)
}
}
}
func (se *StatsExporter) addToList(st *ExportedStat, idx int) {
if st.MonitorName == hmcommon.BlobfuseStats {
se.outputList[idx].Bfs = append(se.outputList[idx].Bfs, st.Stat.(stats_manager.PipeMsg))
} else if st.MonitorName == hmcommon.FileCacheMon {
se.outputList[idx].FcEvent = append(se.outputList[idx].FcEvent, st.Stat.(*hmcommon.CacheEvent))
} else if st.MonitorName == hmcommon.CpuProfiler {
se.outputList[idx].Cpu = st.Stat.(string)
} else if st.MonitorName == hmcommon.MemoryProfiler {
se.outputList[idx].Mem = st.Stat.(string)
} else if st.MonitorName == hmcommon.NetworkProfiler {
se.outputList[idx].Net = st.Stat.(string)
}
}
// check if the given timestamp is present in the output list
// return index if present else return -1
func (se *StatsExporter) checkInList(t string) int {
for i, val := range se.outputList {
if val.Timestamp == t {
return i
}
}
return -1
}
func (se *StatsExporter) addToOutputFile(op *Output) error {
jsonData, err := json.MarshalIndent(op, "", "\t")
if err != nil {
log.Err("stats_exporter::addToOutputFile : unable to marshal [%v]", err)
return err
}
_, err = se.opFile.Write(jsonData)
if err != nil {
log.Err("stats_exporter::addToOutputFile : unable to write to file [%v]", err)
return err
}
err = se.checkOutputFile()
if err != nil {
log.Err("stats_exporter::addToOutputFile : [%v]", err)
return err
}
return nil
}
func (se *StatsExporter) checkOutputFile() error {
f, err := se.opFile.Stat()
if err != nil {
log.Err("stats_exporter::checkOutputFile : Unable to get file info [%v]", err)
return err
}
sz := f.Size()
// close current file and create a new file if the size of current file is greater than 10MB
if sz >= hmcommon.OutputFileSizeinMB*common.MbToBytes {
_, err = se.opFile.WriteString("\n]")
if err != nil {
log.Err("stats_exporter::checkOutputFile : unable to write to file [%v]", err)
return err
}
log.Debug("stats_exporter::checkOutputFile : closing file %v", f.Name())
se.opFile.Close()
err = se.getNewFile()
if err != nil {
log.Err("stats_exporter::checkOutputFile : [%v]")
return err
}
return nil
} else {
_, err = se.opFile.WriteString(",\n")
if err != nil {
log.Err("stats_exporter::checkOutputFile : unable to write to file [%v]", err)
return err
}
}
return nil
}
func (se *StatsExporter) getNewFile() error {
var fname string
var fnameNew string
var err error
baseName := filepath.Join(hmcommon.OutputPath, hmcommon.OutputFileName)
// Remove the oldest file
fname = fmt.Sprintf("%v_%v_%v.%v", baseName, hmcommon.Pid, (hmcommon.OutputFileCount - 1), hmcommon.OutputFileExtension)
_ = os.Remove(fname)
for i := hmcommon.OutputFileCount - 2; i > 0; i-- {
fname = fmt.Sprintf("%v_%v_%v.%v", baseName, hmcommon.Pid, i, hmcommon.OutputFileExtension)
fnameNew = fmt.Sprintf("%v_%v_%v.%v", baseName, hmcommon.Pid, (i + 1), hmcommon.OutputFileExtension)
// Move each file to next number 8 -> 9, 7 -> 8, 6 -> 7 ...
_ = os.Rename(fname, fnameNew)
}
// Rename the latest file to _1
fname = fmt.Sprintf("%v_%v.%v", baseName, hmcommon.Pid, hmcommon.OutputFileExtension)
fnameNew = fmt.Sprintf("%v_%v_1.%v", baseName, hmcommon.Pid, hmcommon.OutputFileExtension)
_ = os.Rename(fname, fnameNew)
fname = fmt.Sprintf("%v_%v.%v", baseName, hmcommon.Pid, hmcommon.OutputFileExtension)
se.opFile, err = os.OpenFile(fname, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0755)
if err != nil {
log.Err("stats_exporter::getNewFile : Unable to create output file [%v]", err)
return err
}
_, err = se.opFile.WriteString("[")
if err != nil {
log.Err("stats_exporter::getNewFile : unable to write to file [%v]", err)
return err
}
return nil
}
func CloseExporter() error {
se, err := NewStatsExporter()
if err != nil || se == nil {
log.Err("stats_exporter::CloseExporter : Error in creating stats exporter instance [%v]", err)
return err
}
se.Destroy()
return nil
}