func()

in go/pkg/cas/upload.go [693:768]


func (u *uploader) visitDir(ctx context.Context, absPath string, pathExclude *regexp.Regexp) (*repb.DirectoryNode, error) {
	var mu sync.Mutex
	dir := &repb.Directory{}
	var subErr error
	var wgChildren sync.WaitGroup

	// This sub-function exist to avoid holding the semaphore while waiting for
	// children.
	err := func() error {
		if err := u.semFileIO.Acquire(ctx, 1); err != nil {
			return err
		}
		defer u.semFileIO.Release(1)

		f, err := os.Open(absPath)
		if err != nil {
			return err
		}
		defer f.Close()

		// Check the context, since file IO functions don't.
		for ctx.Err() == nil {
			infos, err := f.Readdir(128)
			if err == io.EOF {
				break
			}
			if err != nil {
				return err
			}

			for _, info := range infos {
				info := info
				absChild := joinFilePathsFast(absPath, info.Name())
				wgChildren.Add(1)
				u.wgFS.Add(1)
				u.eg.Go(func() error {
					defer wgChildren.Done()
					defer u.wgFS.Done()
					digested, err := u.visitPath(ctx, absChild, info, pathExclude)
					mu.Lock()
					defer mu.Unlock()

					switch {
					case err != nil:
						subErr = err
						return err
					case digested == nil:
						// This file should be ignored.
						return nil
					}

					addDirEntry(dir, digested.dirEntry)
					return nil
				})
			}
		}
		return nil
	}()
	if err != nil {
		return nil, err
	}

	wgChildren.Wait()
	if subErr != nil {
		return nil, errors.Wrapf(subErr, "failed to read the directory %q entirely", absPath)
	}

	item := uploadItemFromDirMsg(absPath, dir)
	if err := u.scheduleCheck(ctx, item); err != nil {
		return nil, err
	}
	return &repb.DirectoryNode{
		Name:   filepath.Base(absPath),
		Digest: item.Digest,
	}, nil
}