common/parallel/TreeCrawler.go (133 lines of code) (raw):

// Copyright © Microsoft <wastore@microsoft.com> // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package parallel import ( "context" "sync" "time" ) type crawler struct { output chan CrawlResult workerBody EnumerateOneDirFunc parallelism int cond *sync.Cond // the following are protected by cond (and must only be accessed when cond.L is held) unstartedDirs []Directory // not a channel, because channels have length limits, and those get in our way dirInProgressCount int64 lastAutoShutdown time.Time } type Directory interface{} type DirectoryEntry interface{} type CrawlResult struct { item DirectoryEntry err error } func (r CrawlResult) Item() (interface{}, error) { return r.item, r.err } // must be safe to be simultaneously called by multiple go-routines, each with a different dir type EnumerateOneDirFunc func(dir Directory, enqueueDir func(Directory), enqueueOutput func(DirectoryEntry, error)) error // Crawl crawls an abstract directory tree, using the supplied enumeration function. May be use for whatever // that function can enumerate (i.e. not necessarily a local file system, just anything tree-structured) func Crawl(ctx context.Context, root Directory, worker EnumerateOneDirFunc, parallelism int) <-chan CrawlResult { c := &crawler{ unstartedDirs: make([]Directory, 0, 1024), output: make(chan CrawlResult, 1000), workerBody: worker, parallelism: parallelism, cond: sync.NewCond(&sync.Mutex{}), } go c.start(ctx, root) return c.output } func (c *crawler) start(ctx context.Context, root Directory) { done := make(chan struct{}) heartbeat := func() { for { select { case <-done: return case <-time.After(10 * time.Second): c.cond.Broadcast() // prevent things waiting for ever, even after cancellation has happened } } } go heartbeat() c.unstartedDirs = append(c.unstartedDirs, root) c.runWorkersToCompletion(ctx) close(c.output) close(done) } func (c *crawler) runWorkersToCompletion(ctx context.Context) { wg := &sync.WaitGroup{} for i := 0; i < c.parallelism; i++ { wg.Add(1) go c.workerLoop(ctx, wg, i) } wg.Wait() } func (c *crawler) workerLoop(ctx context.Context, wg *sync.WaitGroup, workerIndex int) { defer wg.Done() var err error mayHaveMore := true for mayHaveMore && ctx.Err() == nil { mayHaveMore, err = c.processOneDirectory(ctx, workerIndex) if err != nil { c.output <- CrawlResult{err: err} // output the error, but we don't necessarily stop the enumeration (e.g. it might be one unreadable dir) } } } func (c *crawler) processOneDirectory(ctx context.Context, workerIndex int) (bool, error) { const maxQueueDirectories = 1000 * 1000 const maxQueueDirsForBreadthFirst = 100 * 1000 // figure is somewhat arbitrary. Want it big, but not huge var toExamine Directory stop := false // Acquire a directory to work on // Note that we need explicit locking because there are two // mutable things involved in our decision making, not one. (The two being c.unstartedDirs and c.dirInProgressCount) c.cond.L.Lock() { // wait while there's nothing to do, and another thread might be going to add something for len(c.unstartedDirs) == 0 && c.dirInProgressCount > 0 && ctx.Err() == nil { c.cond.Wait() // temporarily relinquish the lock (just on this line only) while we wait for a Signal/Broadcast } // if we have something to do now, grab it. Else we must be all finished with nothing more to do (ever) stop = ctx.Err() != nil if !stop { if len(c.unstartedDirs) > 0 { if len(c.unstartedDirs) < maxQueueDirsForBreadthFirst { // pop from start of list. This gives a breadth-first flavour to the search. // (Breadth-first is useful for distributing small-file workloads over the full keyspace, which // is can help performance when uploading small files to Azure Blob Storage) toExamine = c.unstartedDirs[0] c.unstartedDirs = c.unstartedDirs[1:] } else { // Fall back to popping from end of list if list is already pretty big. // This gives more of a depth-first flavour to our processing, // which (we think) will prevent c.unstartedDirs getting really large and using too much RAM. // (Since we think that depth first tends to hit leaf nodes relatively quickly, so total number of // unstarted dirs should tend to grow less in a depth first mode) lastIndex := len(c.unstartedDirs) - 1 toExamine = c.unstartedDirs[lastIndex] c.unstartedDirs = c.unstartedDirs[:lastIndex] } c.dirInProgressCount++ // record that we are working on something c.cond.Broadcast() // and let other threads know of that fact } else { if c.dirInProgressCount > 0 { // something has gone wrong in the design of this algorithm, because we should only get here if all done now panic("assertion failure: should be no more dirs in progress here") } stop = true } } } c.cond.L.Unlock() if stop { return false, nil } // find dir's immediate children (outside the lock, because this could be slow) var foundDirectories = make([]Directory, 0, 16) addDir := func(d Directory) { foundDirectories = append(foundDirectories, d) } addOutput := func(de DirectoryEntry, er error) { select { case c.output <- CrawlResult{item: de, err: er}: case <-ctx.Done(): // don't block on full channel if cancelled } } bodyErr := c.workerBody(toExamine, addDir, addOutput) // this is the worker body supplied by our caller // finally, update shared state (inside the lock) c.cond.L.Lock() defer c.cond.L.Unlock() c.unstartedDirs = append(c.unstartedDirs, foundDirectories...) // do NOT try to wait here if unstartedDirs is getting big. May cause deadlocks, due to all workers waiting and none processing the queue c.dirInProgressCount-- // we were doing something, and now we have finished it c.cond.Broadcast() // let other workers know that the state has changed // If our queue of unstarted stuff is getting really huge, // reduce our parallelism in the hope of preventing further excessive RAM growth. // (It's impossible to know exactly what to do here, because we don't know whether more workers would _clear_ // the queue more quickly; or _add to_ the queue more quickly. It depends on whether the directories we process // next contain mostly child directories or if they are "leaf" directories containing mostly just files. But, // if we slowly reduce parallelism the end state is closer to a single-threaded depth-first traversal, which // is generally fine in terms of memory usage on most folder structures) shouldShutSelfDown := len(c.unstartedDirs) > maxQueueDirectories && // we are getting way too much stuff queued up workerIndex > (c.parallelism/4) && // never shut down the last ones, since we need something left to clear the queue time.Since(c.lastAutoShutdown) > time.Second // adjust somewhat gradually if shouldShutSelfDown { c.lastAutoShutdown = time.Now() return false, bodyErr } return true, bodyErr // true because, as far as we know, the work is not finished. And err because it was the err (if any) from THIS dir }