in go/pkg/cas/upload.go [590:680]
func (u *uploader) visitRegularFile(ctx context.Context, absPath string, info os.FileInfo) (*repb.FileNode, error) {
isLarge := info.Size() >= u.Config.LargeFileThreshold
// Lock the mutex before acquiring a semaphore to avoid hogging the latter.
if isLarge {
// Read only a few large files at a time.
if err := u.semLargeFile.Acquire(ctx, 1); err != nil {
return nil, errors.WithStack(err)
}
defer u.semLargeFile.Release(1)
}
if err := u.semFileIO.Acquire(ctx, 1); err != nil {
return nil, err
}
defer u.semFileIO.Release(1)
f, err := u.openFileSource(absPath)
if err != nil {
return nil, err
}
defer f.Close()
ret := &repb.FileNode{
Name: info.Name(),
IsExecutable: (info.Mode() & 0100) != 0,
}
if info.Size() <= u.Config.SmallFileThreshold {
// This file is small enough to buffer it entirely.
contents, err := ioutil.ReadAll(f)
if err != nil {
return nil, err
}
item := uploadItemFromBlob(absPath, contents)
ret.Digest = item.Digest
return ret, u.scheduleCheck(ctx, item)
}
// It is a medium or large file.
tctx, task := trace.NewTask(ctx, "medium or large file")
defer task.End()
trace.Log(tctx, "file", info.Name())
// Compute the hash.
now := time.Now()
region := trace.StartRegion(tctx, "digest")
dig, err := digest.NewFromReader(f)
region.End()
if err != nil {
return nil, errors.Wrapf(err, "failed to compute hash")
}
log.Infof("compute digest %s: %s", info.Name(), time.Since(now))
ret.Digest = dig.ToProto()
item := &uploadItem{
Title: absPath,
Digest: ret.Digest,
}
if isLarge {
// Large files are special: locality is important - we want to re-read the
// file ASAP.
// Also we are not going to use BatchUploads anyway, so we can take
// advantage of ByteStream's built-in presence check.
// https://github.com/bazelbuild/remote-apis/blob/0cd22f7b466ced15d7803e8845d08d3e8d2c51bc/build/bazel/remote/execution/v2/remote_execution.proto#L250-L254
if res, err := u.findMissingBlobs(ctx, []*uploadItem{item}); err != nil {
return nil, errors.Wrapf(err, "failed to check existence")
} else if len(res.MissingBlobDigests) == 0 {
log.Infof("the file already exists. do not upload %s", absPath)
atomic.AddInt64(&u.stats.CacheHits.Digests, 1)
atomic.AddInt64(&u.stats.CacheHits.Bytes, ret.Digest.SizeBytes)
return ret, nil
}
item.Open = func() (uploadSource, error) {
return f, f.SeekStart(0)
}
return ret, u.stream(tctx, item, true)
}
// Schedule a check and close the file (in defer).
// item.Open will reopen the file.
item.Open = func() (uploadSource, error) {
return u.openFileSource(absPath)
}
return ret, u.scheduleCheck(ctx, item)
}