in go/pkg/client/cas.go [1589:1689]
func (c *Client) downloadNonUnified(ctx context.Context, outDir string, outputs map[digest.Digest]*TreeOutput) (*MovedBytesMetadata, error) {
var dgs []digest.Digest
// statsMu protects stats across threads.
statsMu := sync.Mutex{}
fullStats := &MovedBytesMetadata{}
if bool(c.useBatchOps) && bool(c.UtilizeLocality) {
paths := make([]*TreeOutput, 0, len(outputs))
for _, output := range outputs {
paths = append(paths, output)
}
// This is to utilize locality in disk when writing files.
sort.Slice(paths, func(i, j int) bool {
return paths[i].Path < paths[j].Path
})
for _, path := range paths {
dgs = append(dgs, path.Digest)
fullStats.Requested += path.Digest.Size
}
} else {
for dg := range outputs {
dgs = append(dgs, dg)
fullStats.Requested += dg.Size
}
}
LogContextInfof(ctx, log.Level(2), "%d items to download", len(dgs))
var batches [][]digest.Digest
if c.useBatchOps {
batches = c.makeBatches(ctx, dgs, !bool(c.UtilizeLocality))
} else {
LogContextInfof(ctx, log.Level(2), "Downloading them individually")
for i := range dgs {
LogContextInfof(ctx, log.Level(3), "Creating single batch of blob %s", dgs[i])
batches = append(batches, dgs[i:i+1])
}
}
eg, eCtx := errgroup.WithContext(ctx)
for i, batch := range batches {
i, batch := i, batch // https://golang.org/doc/faq#closures_and_goroutines
eg.Go(func() error {
if err := c.casDownloaders.Acquire(eCtx, 1); err != nil {
return err
}
defer c.casDownloaders.Release(1)
if i%logInterval == 0 {
LogContextInfof(ctx, log.Level(2), "%d batches left to download", len(batches)-i)
}
if len(batch) > 1 {
LogContextInfof(ctx, log.Level(3), "Downloading batch of %d files", len(batch))
bchMap, err := c.BatchDownloadBlobs(eCtx, batch)
for _, dg := range batch {
data := bchMap[dg]
out := outputs[dg]
perm := c.RegularMode
if out.IsExecutable {
perm = c.ExecutableMode
}
if err := ioutil.WriteFile(filepath.Join(outDir, out.Path), data, perm); err != nil {
return err
}
statsMu.Lock()
fullStats.LogicalMoved += int64(len(data))
fullStats.RealMoved += int64(len(data))
statsMu.Unlock()
}
if err != nil {
return err
}
} else {
out := outputs[batch[0]]
path := filepath.Join(outDir, out.Path)
LogContextInfof(ctx, log.Level(3), "Downloading single file with digest %s to %s", out.Digest, path)
stats, err := c.ReadBlobToFile(ctx, out.Digest, path)
if err != nil {
return err
}
statsMu.Lock()
fullStats.addFrom(stats)
statsMu.Unlock()
if out.IsExecutable {
if err := os.Chmod(path, c.ExecutableMode); err != nil {
return err
}
}
}
if eCtx.Err() != nil {
return eCtx.Err()
}
return nil
})
}
LogContextInfof(ctx, log.Level(3), "Waiting for remaining jobs")
err := eg.Wait()
LogContextInfof(ctx, log.Level(3), "Done")
return fullStats, err
}