common/parallel/dirReader.go (105 lines of code) (raw):

// Copyright © Microsoft <wastore@microsoft.com> // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package parallel import ( "errors" "fmt" "os" "path/filepath" "time" ) // NewDirReader makes a directory reader. If parallelStat is true, then the reader // uses a pool of go-routines to do the lookups from name of directory entry to full os.FileInfo. // Useful on Linux, but not Windows. // Why do we need this? Because on Linux os.Readdir does the same lookups, but it does them sequentially which hurts performance. // Alternatives like https://github.com/karrick/godirwalk avoid the lookup all together, but only if you don't need any information // about each entry other than whether its a file or directory. We definitely also need to know whether its a symlink. // And, in our current architecture, we also need to get the size and LMT for the file. func NewDirReader(totalAvailableParallelisim int, parallelStat bool) (DirReader, int) { if parallelStat { r := linuxDirReader{ ch: make(chan linuxDirEntry, 10000), } // allocate 3/4 of available parallelism to dir reads parallelismForDirReads := int(float32(totalAvailableParallelisim*3) / 4) if parallelismForDirReads < 1 { parallelismForDirReads = 1 } remainingParallelism := totalAvailableParallelisim - parallelismForDirReads if remainingParallelism < 1 { remainingParallelism = 1 } // spin up workers for i := 0; i < parallelismForDirReads; i++ { go r.worker() } return r, remainingParallelism } // we're not reading file properties in parallel return &defaultDirReader{}, totalAvailableParallelisim } //////////////////////// type defaultDirReader struct{} // Readdir in the default reader just makes the normal OS read call // On Windows, this is performant because Go does not have to make any additional OS calls to hydrate the raw results into os.FileInfos. func (_ defaultDirReader) Readdir(dir *os.File, n int) ([]os.FileInfo, error) { return dir.Readdir(n) } func (_ defaultDirReader) Close() { // noop } ///////////////////////// type linuxDirEntry struct { parentDir *os.File name string resultCh chan failableFileInfo } type linuxDirReader struct { ch chan linuxDirEntry } var ReaddirTimeoutError = errors.New("readdir timed out getting file properties") func (r linuxDirReader) Readdir(dir *os.File, n int) ([]os.FileInfo, error) { for try := 1; ; try++ { timeout := time.Duration(try*try) * time.Minute result, err := r.doReaddir(dir, n, timeout) if err == ReaddirTimeoutError && try <= 3 { // we saw a few timeouts in customer testing prior to adding this continue } return result, err } } func (r linuxDirReader) doReaddir(dir *os.File, n int, timeout time.Duration) ([]os.FileInfo, error) { // get the names names, err := dir.Readdirnames(n) if err != nil { return nil, err // TODO: is it correct to assume that if err has a value, then names is empty? } // enqueue the LStatting resCh := make(chan failableFileInfo, len(names)) // big enough that it will never get full and cause deadlocks for _, n := range names { r.ch <- linuxDirEntry{ parentDir: dir, name: n, resultCh: resCh, } } // collect the results res := make([]os.FileInfo, 0, 256) for i := 0; i < len(names); i++ { select { case r := <-resCh: res = append(res, r) // r is failableFileInfo, so may carry its own error with it case <-time.After(timeout): return nil, ReaddirTimeoutError } } return res, nil } // worker reads linuxDirEntries from channel, LStat's them, and sends the result back on the // entry-specific channel func (r linuxDirReader) worker() { for { e, ok := <-r.ch if !ok { return } path := filepath.Join(e.parentDir.Name(), e.name) fi, err := os.Lstat(path) // Lstat because we don't want to follow symlinks if err != nil { err = fmt.Errorf("%w (this error is harmless if the file '%s' has just been deleted. But in any other case, this error may be a real error)", err, path) } select { case e.resultCh <- failableFileInfoImpl{fi, err}: default: // give up on this one, so this worker doesn't get permanently blocked } } } func (r linuxDirReader) Close() { close(r.ch) // so workers know to shutdown } type failableFileInfoImpl struct { os.FileInfo error error } func (f failableFileInfoImpl) Error() error { return f.error }