component/xload/lister.go (161 lines of code) (raw):

/* _____ _____ _____ ____ ______ _____ ------ | | | | | | | | | | | | | | | | | | | | | | | | | | | --- | | | | |-----| |---- | | |-----| |----- ------ | | | | | | | | | | | | | | ____| |_____ | ____| | ____| | |_____| _____| |_____ |_____ Licensed under the MIT License <http://opensource.org/licenses/MIT>. Copyright © 2020-2025 Microsoft Corporation. All rights reserved. Author : <blobfusedev@microsoft.com> Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE */ package xload import ( "fmt" "os" "path/filepath" "time" "github.com/Azure/azure-storage-fuse/v2/common/config" "github.com/Azure/azure-storage-fuse/v2/common/log" "github.com/Azure/azure-storage-fuse/v2/internal" ) // verify that the below types implement the xcomponent interfaces var _ XComponent = &lister{} var _ XComponent = &remoteLister{} // verify that the below types implement the xenumerator interfaces var _ enumerator = &remoteLister{} type lister struct { XBase path string // base path of the directory to be listed defaultPermission os.FileMode // default permission of files and directories in the local path } type enumerator interface { mkdir(name string) error } // -------------------------------------------------------------------------------------------------------- type remoteLister struct { lister listBlocked bool } type remoteListerOptions struct { path string workerCount uint32 defaultPermission os.FileMode remote internal.Component statsMgr *StatsManager } func newRemoteLister(opts *remoteListerOptions) (*remoteLister, error) { if opts == nil || opts.path == "" || opts.remote == nil || opts.statsMgr == nil || opts.workerCount == 0 { log.Err("lister::NewRemoteLister : invalid parameters sent to create remote lister") return nil, fmt.Errorf("invalid parameters sent to create remote lister") } log.Debug("lister::NewRemoteLister : create new remote lister for %s, default permission %v, workers %v", opts.path, opts.defaultPermission, opts.workerCount) rl := &remoteLister{ lister: lister{ path: opts.path, defaultPermission: opts.defaultPermission, }, listBlocked: false, } rl.SetName(LISTER) rl.SetWorkerCount(opts.workerCount) rl.SetRemote(opts.remote) rl.SetStatsManager(opts.statsMgr) rl.Init() return rl, nil } func (rl *remoteLister) Init() { rl.SetThreadPool(NewThreadPool(rl.GetWorkerCount(), rl.Process)) if rl.GetThreadPool() == nil { log.Err("remoteLister::Init : fail to init thread pool") } } func (rl *remoteLister) Start() { log.Debug("remoteLister::Start : start remote lister for %s", rl.path) rl.GetThreadPool().Start() rl.Schedule(&WorkItem{CompName: rl.GetName()}) } func (rl *remoteLister) Stop() { log.Debug("remoteLister::Stop : stop remote lister for %s", rl.path) if rl.GetThreadPool() != nil { rl.GetThreadPool().Stop() } rl.GetNext().Stop() } // wait for the configured block-list-on-mount-sec to make the list call func waitForListTimeout() error { var blockListSeconds uint16 = 0 err := config.UnmarshalKey("azstorage.block-list-on-mount-sec", &blockListSeconds) if err != nil { return err } time.Sleep(time.Duration(blockListSeconds) * time.Second) return nil } func (rl *remoteLister) Process(item *WorkItem) (int, error) { relPath := item.Path // TODO:: xload : check this for subdirectory mounting log.Debug("remoteLister::Process : Reading remote dir %s", relPath) // this block will be executed only in the first list call for the remote directory // so haven't made the listBlocked variable atomic if !rl.listBlocked { log.Debug("remoteLister::Process : Waiting for block-list-on-mount-sec before making the list call") err := waitForListTimeout() if err != nil { log.Err("remoteLister::Process : unable to unmarshal block-list-on-mount-sec [%s]", err.Error()) return 0, err } rl.listBlocked = true } marker := "" var cnt, iteration int for { entries, new_marker, err := rl.GetRemote().StreamDir(internal.StreamDirOptions{ Name: relPath, Token: marker, }) if err != nil { log.Err("remoteLister::Process : Remote listing failed for %s [%s]", relPath, err.Error()) break } marker = new_marker cnt += len(entries) iteration++ log.Debug("remoteLister::Process : count: %d , iterations: %d", cnt, iteration) // send number of items listed in current iteration to stats manager rl.GetStatsManager().AddStats(&StatsItem{ Component: LISTER, Name: relPath, ListerCount: uint64(len(entries)), }) for _, entry := range entries { log.Debug("remoteLister::Process : Iterating: %s, Is directory: %v", entry.Path, entry.IsDir()) if entry.IsDir() { // create directory in local // spawn go routine for directory creation and then // adding to the input channel of the listing component // TODO:: xload : check how many threads can we spawn go func(name string) { localPath := filepath.Join(rl.path, name) err = rl.mkdir(localPath) // TODO:: xload : handle error if err != nil { log.Err("remoteLister::Process : Failed to create directory [%s]", err.Error()) return } // push the directory to input pool for its listing rl.Schedule(&WorkItem{ CompName: rl.GetName(), Path: name, }) }(entry.Path) } else { fileMode := rl.defaultPermission if !entry.IsModeDefault() { fileMode = entry.Mode } // send file to the splitter's channel for chunking rl.GetNext().Schedule(&WorkItem{ CompName: rl.GetNext().GetName(), Path: entry.Path, DataLen: uint64(entry.Size), Mode: fileMode, Atime: entry.Atime, Mtime: entry.Mtime, MD5: entry.MD5, }) } } if len(new_marker) == 0 { log.Debug("remoteLister::Process : remote listing done for %s", relPath) break } } return cnt, nil } func (rl *remoteLister) mkdir(name string) error { log.Debug("remoteLister::mkdir : Creating local path: %s, mode %v", name, rl.defaultPermission) err := os.MkdirAll(name, rl.defaultPermission) // send stats for dir creation rl.GetStatsManager().AddStats(&StatsItem{ Component: LISTER, Name: name, Dir: true, Success: err == nil, Download: true, }) return err }