func()

in go/pkg/cas/upload.go [590:680]


func (u *uploader) visitRegularFile(ctx context.Context, absPath string, info os.FileInfo) (*repb.FileNode, error) {
	isLarge := info.Size() >= u.Config.LargeFileThreshold

	// Lock the mutex before acquiring a semaphore to avoid hogging the latter.
	if isLarge {
		// Read only a few large files at a time.
		if err := u.semLargeFile.Acquire(ctx, 1); err != nil {
			return nil, errors.WithStack(err)
		}
		defer u.semLargeFile.Release(1)
	}

	if err := u.semFileIO.Acquire(ctx, 1); err != nil {
		return nil, err
	}
	defer u.semFileIO.Release(1)

	f, err := u.openFileSource(absPath)
	if err != nil {
		return nil, err
	}
	defer f.Close()

	ret := &repb.FileNode{
		Name:         info.Name(),
		IsExecutable: (info.Mode() & 0100) != 0,
	}

	if info.Size() <= u.Config.SmallFileThreshold {
		// This file is small enough to buffer it entirely.
		contents, err := ioutil.ReadAll(f)
		if err != nil {
			return nil, err
		}
		item := uploadItemFromBlob(absPath, contents)
		ret.Digest = item.Digest
		return ret, u.scheduleCheck(ctx, item)
	}

	// It is a medium or large file.

	tctx, task := trace.NewTask(ctx, "medium or large file")
	defer task.End()
	trace.Log(tctx, "file", info.Name())

	// Compute the hash.
	now := time.Now()
	region := trace.StartRegion(tctx, "digest")
	dig, err := digest.NewFromReader(f)
	region.End()
	if err != nil {
		return nil, errors.Wrapf(err, "failed to compute hash")
	}
	log.Infof("compute digest %s: %s", info.Name(), time.Since(now))
	ret.Digest = dig.ToProto()

	item := &uploadItem{
		Title:  absPath,
		Digest: ret.Digest,
	}

	if isLarge {
		// Large files are special: locality is important - we want to re-read the
		// file ASAP.
		// Also we are not going to use BatchUploads anyway, so we can take
		// advantage of ByteStream's built-in presence check.
		// https://github.com/bazelbuild/remote-apis/blob/0cd22f7b466ced15d7803e8845d08d3e8d2c51bc/build/bazel/remote/execution/v2/remote_execution.proto#L250-L254

		if res, err := u.findMissingBlobs(ctx, []*uploadItem{item}); err != nil {
			return nil, errors.Wrapf(err, "failed to check existence")
		} else if len(res.MissingBlobDigests) == 0 {
			log.Infof("the file already exists. do not upload %s", absPath)
			atomic.AddInt64(&u.stats.CacheHits.Digests, 1)
			atomic.AddInt64(&u.stats.CacheHits.Bytes, ret.Digest.SizeBytes)
			return ret, nil
		}

		item.Open = func() (uploadSource, error) {
			return f, f.SeekStart(0)
		}
		return ret, u.stream(tctx, item, true)
	}

	// Schedule a check and close the file (in defer).
	// item.Open will reopen the file.

	item.Open = func() (uploadSource, error) {
		return u.openFileSource(absPath)
	}
	return ret, u.scheduleCheck(ctx, item)
}