component/xload/stats_manager.go (222 lines of code) (raw):

/* _____ _____ _____ ____ ______ _____ ------ | | | | | | | | | | | | | | | | | | | | | | | | | | | --- | | | | |-----| |---- | | |-----| |----- ------ | | | | | | | | | | | | | | ____| |_____ | ____| | ____| | |_____| _____| |_____ |_____ Licensed under the MIT License <http://opensource.org/licenses/MIT>. Copyright © 2020-2025 Microsoft Corporation. All rights reserved. Author : <blobfusedev@microsoft.com> Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE */ package xload import ( "encoding/json" "fmt" "io" "os" "path/filepath" "strings" "sync" "time" "github.com/Azure/azure-storage-fuse/v2/common" "github.com/Azure/azure-storage-fuse/v2/common/log" ) type StatsManager struct { totalFiles uint64 // total number of files that have been scanned so far success uint64 // number of files that have been successfully processed failed uint64 // number of files that failed dirs uint64 // number of directories processed bytesDownloaded uint64 // total number of bytes downloaded bytesUploaded uint64 // total number of bytes uploaded startTime time.Time // variable indicating the time at which the stats manager started fileHandle *os.File // file where stats will be dumped waitGroup sync.WaitGroup // wait group to wait for stats manager thread to finish items chan *StatsItem // channel to hold the stats items done chan bool // channel to indicate if the stats manager has completed or not } type StatsItem struct { Component string // component name which has exported the stat ListerCount uint64 // number of files scanned by the lister in an iteration Name string // name of the file processed Dir bool // flag to indicate if the item is a directory Success bool // flag to indicate if the file has been processed successfully or not Download bool // flag to denote upload or download BytesTransferred uint64 // bytes uploaded or downloaded for this file } type statsJSONData struct { Timestamp string `json:"Timestamp"` PercentCompleted float64 `json:"PercentCompleted"` Total uint64 `json:"Total"` Done uint64 `json:"Done"` Failed uint64 `json:"Failed"` Pending uint64 `json:"Pending"` BytesTransferred uint64 `json:"BytesTransferred"` BandwidthMbps float64 `json:"Bandwidth(Mbps)"` } const ( STATS_MANAGER = "STATS_MANAGER" DURATION = 4 // time interval in seconds at which the stats will be dumped JSON_FILE_NAME = "xload_stats_{PID}.json" // json file name where the stats manager will dump the stats ) func NewStatsManager(count uint32, isExportEnabled bool) (*StatsManager, error) { var fh *os.File var err error if isExportEnabled { pid := fmt.Sprintf("%v", os.Getpid()) path := common.ExpandPath(filepath.Join(common.DefaultWorkDir, strings.ReplaceAll(JSON_FILE_NAME, "{PID}", pid))) log.Crit("statsManager::NewStatsManager : creating json file %v", path) fh, err = os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755) if err != nil { log.Err("statsManager::NewStatsManager : failed to create json file %v [%v]", path, err.Error()) return nil, err } } return &StatsManager{ fileHandle: fh, items: make(chan *StatsItem, count*2), done: make(chan bool, 1), }, nil } func (sm *StatsManager) Start() { sm.waitGroup.Add(1) sm.startTime = time.Now().UTC() log.Debug("statsManager::start : start stats manager at time %v", sm.startTime.Format(time.RFC1123)) _ = sm.writeToJSON([]byte("[\n"), false) _ = sm.marshalStatsData(&statsJSONData{Timestamp: sm.startTime.Format(time.RFC1123)}, false) _ = sm.writeToJSON([]byte("\n]"), false) go sm.statsProcessor() go sm.statsExporter() } // TODO:: xload : the stop method runs on unmount. See if the channels can be closed if the job is 100% complete func (sm *StatsManager) Stop() { log.Debug("statsManager::stop : stop stats manager") sm.done <- true // close the stats exporter thread close(sm.done) // TODO::xload : check if closing the done channel here will lead to closing the stats exporter thread close(sm.items) sm.waitGroup.Wait() if sm.fileHandle != nil { sm.fileHandle.Close() } } func (sm *StatsManager) AddStats(item *StatsItem) { sm.items <- item } func (sm *StatsManager) updateSuccessFailedCtr(isSuccess bool) { if isSuccess { sm.success += 1 } else { sm.failed += 1 } } func (sm *StatsManager) statsProcessor() { defer sm.waitGroup.Done() for item := range sm.items { switch item.Component { case LISTER: sm.totalFiles += item.ListerCount // log.Debug("statsManager::statsProcessor : Directory listed %v, total number of files listed so far = %v", item.name, sm.totalFiles) if item.Dir { sm.dirs += 1 sm.updateSuccessFailedCtr(item.Success) } case SPLITTER: // log.Debug("statsManager::statsProcessor : splitter: Name %v, success %v, download %v", item.name, item.success, item.download) sm.updateSuccessFailedCtr(item.Success) case DATA_MANAGER: // log.Debug("statsManager::statsProcessor : data manager: Name %v, success %v, download %v, bytes transferred %v", item.name, item.success, item.download, item.bytesTransferred) if item.Download { sm.bytesDownloaded += item.BytesTransferred } else { sm.bytesUploaded += item.BytesTransferred } case STATS_MANAGER: sm.calculateBandwidth() default: log.Err("statsManager::statsProcessor : wrong component name used for sending stats") } } log.Debug("statsManager::statsProcessor : stats processor completed") } func (sm *StatsManager) statsExporter() { ticker := time.NewTicker(DURATION * time.Second) for { select { case <-sm.done: ticker.Stop() return case <-ticker.C: sm.AddStats(&StatsItem{ Component: STATS_MANAGER, }) } } } func (sm *StatsManager) calculateBandwidth() { if sm.totalFiles == 0 { log.Debug("statsManager::calculateBandwidth : skipping as total files listed so far is %v", sm.totalFiles) return } currTime := time.Now().UTC() timeLapsed := currTime.Sub(sm.startTime).Seconds() bytesTransferred := sm.bytesDownloaded + sm.bytesUploaded filesProcessed := sm.success + sm.failed filesPending := sm.totalFiles - filesProcessed percentCompleted := (float64(filesProcessed) / float64(sm.totalFiles)) * 100 bandwidthMbps := float64(bytesTransferred*8) / (timeLapsed * float64(MB)) log.Crit("statsManager::calculateBandwidth : timestamp %v, %.2f%%, %v Done, %v Failed, "+ "%v Pending, %v Total, Bytes transferred %v, Throughput (Mbps): %.2f", currTime.Format(time.RFC1123), percentCompleted, sm.success, sm.failed, filesPending, sm.totalFiles, bytesTransferred, bandwidthMbps) if sm.fileHandle != nil { err := sm.marshalStatsData(&statsJSONData{ Timestamp: currTime.Format(time.RFC1123), PercentCompleted: RoundFloat(percentCompleted, 2), Total: sm.totalFiles, Done: sm.success, Failed: sm.failed, Pending: filesPending, BytesTransferred: bytesTransferred, BandwidthMbps: RoundFloat(bandwidthMbps, 2), }, true) if err != nil { log.Err("statsManager::calculateBandwidth : failed to write to json file [%v]", err.Error()) } } // TODO:: xload : determine more effective way to decide if the listing has completed and the stats exporter can be terminated if sm.totalFiles == filesProcessed && sm.totalFiles != sm.dirs { sm.done <- true return } } func (sm *StatsManager) marshalStatsData(data *statsJSONData, seek bool) error { if sm.fileHandle == nil { return nil } jsonData, err := json.MarshalIndent(data, "", "\t") if err != nil { log.Err("statsManager::convertToBytes : unable to marshal [%v]", err.Error()) return err } err = sm.writeToJSON(jsonData, seek) if err != nil { log.Err("statsManager::convertToBytes : failed to write to json file [%v]", err.Error()) return err } return nil } func (sm *StatsManager) writeToJSON(data []byte, seek bool) error { if sm.fileHandle == nil { return nil } var err error if seek { _, err = sm.fileHandle.Seek(-2, io.SeekEnd) if err != nil { log.Err("statsManager::writeToJSON : failed to seek [%v]", err.Error()) return err } _, err = sm.fileHandle.Write([]byte(",\n")) if err != nil { log.Err("statsManager::writeToJSON : failed to write to json file [%v]", err.Error()) return err } } _, err = sm.fileHandle.Write(data) if err != nil { log.Err("statsManager::writeToJSON : failed to write to json file [%v]", err.Error()) return err } if seek { _, err = sm.fileHandle.Write([]byte("\n]")) if err != nil { log.Err("statsManager::writeToJSON : failed to write to json file [%v]", err.Error()) return err } } return nil }