in go/pkg/cas/upload.go [693:768]
func (u *uploader) visitDir(ctx context.Context, absPath string, pathExclude *regexp.Regexp) (*repb.DirectoryNode, error) {
var mu sync.Mutex
dir := &repb.Directory{}
var subErr error
var wgChildren sync.WaitGroup
// This sub-function exist to avoid holding the semaphore while waiting for
// children.
err := func() error {
if err := u.semFileIO.Acquire(ctx, 1); err != nil {
return err
}
defer u.semFileIO.Release(1)
f, err := os.Open(absPath)
if err != nil {
return err
}
defer f.Close()
// Check the context, since file IO functions don't.
for ctx.Err() == nil {
infos, err := f.Readdir(128)
if err == io.EOF {
break
}
if err != nil {
return err
}
for _, info := range infos {
info := info
absChild := joinFilePathsFast(absPath, info.Name())
wgChildren.Add(1)
u.wgFS.Add(1)
u.eg.Go(func() error {
defer wgChildren.Done()
defer u.wgFS.Done()
digested, err := u.visitPath(ctx, absChild, info, pathExclude)
mu.Lock()
defer mu.Unlock()
switch {
case err != nil:
subErr = err
return err
case digested == nil:
// This file should be ignored.
return nil
}
addDirEntry(dir, digested.dirEntry)
return nil
})
}
}
return nil
}()
if err != nil {
return nil, err
}
wgChildren.Wait()
if subErr != nil {
return nil, errors.Wrapf(subErr, "failed to read the directory %q entirely", absPath)
}
item := uploadItemFromDirMsg(absPath, dir)
if err := u.scheduleCheck(ctx, item); err != nil {
return nil, err
}
return &repb.DirectoryNode{
Name: filepath.Base(absPath),
Digest: item.Digest,
}, nil
}