func()

in go/pkg/client/cas.go [1589:1689]


func (c *Client) downloadNonUnified(ctx context.Context, outDir string, outputs map[digest.Digest]*TreeOutput) (*MovedBytesMetadata, error) {
	var dgs []digest.Digest
	// statsMu protects stats across threads.
	statsMu := sync.Mutex{}
	fullStats := &MovedBytesMetadata{}

	if bool(c.useBatchOps) && bool(c.UtilizeLocality) {
		paths := make([]*TreeOutput, 0, len(outputs))
		for _, output := range outputs {
			paths = append(paths, output)
		}

		// This is to utilize locality in disk when writing files.
		sort.Slice(paths, func(i, j int) bool {
			return paths[i].Path < paths[j].Path
		})

		for _, path := range paths {
			dgs = append(dgs, path.Digest)
			fullStats.Requested += path.Digest.Size
		}
	} else {
		for dg := range outputs {
			dgs = append(dgs, dg)
			fullStats.Requested += dg.Size
		}
	}

	LogContextInfof(ctx, log.Level(2), "%d items to download", len(dgs))
	var batches [][]digest.Digest
	if c.useBatchOps {
		batches = c.makeBatches(ctx, dgs, !bool(c.UtilizeLocality))
	} else {
		LogContextInfof(ctx, log.Level(2), "Downloading them individually")
		for i := range dgs {
			LogContextInfof(ctx, log.Level(3), "Creating single batch of blob %s", dgs[i])
			batches = append(batches, dgs[i:i+1])
		}
	}

	eg, eCtx := errgroup.WithContext(ctx)
	for i, batch := range batches {
		i, batch := i, batch // https://golang.org/doc/faq#closures_and_goroutines
		eg.Go(func() error {
			if err := c.casDownloaders.Acquire(eCtx, 1); err != nil {
				return err
			}
			defer c.casDownloaders.Release(1)
			if i%logInterval == 0 {
				LogContextInfof(ctx, log.Level(2), "%d batches left to download", len(batches)-i)
			}
			if len(batch) > 1 {
				LogContextInfof(ctx, log.Level(3), "Downloading batch of %d files", len(batch))
				bchMap, err := c.BatchDownloadBlobs(eCtx, batch)
				for _, dg := range batch {
					data := bchMap[dg]
					out := outputs[dg]
					perm := c.RegularMode
					if out.IsExecutable {
						perm = c.ExecutableMode
					}
					if err := ioutil.WriteFile(filepath.Join(outDir, out.Path), data, perm); err != nil {
						return err
					}
					statsMu.Lock()
					fullStats.LogicalMoved += int64(len(data))
					fullStats.RealMoved += int64(len(data))
					statsMu.Unlock()
				}
				if err != nil {
					return err
				}
			} else {
				out := outputs[batch[0]]
				path := filepath.Join(outDir, out.Path)
				LogContextInfof(ctx, log.Level(3), "Downloading single file with digest %s to %s", out.Digest, path)
				stats, err := c.ReadBlobToFile(ctx, out.Digest, path)
				if err != nil {
					return err
				}
				statsMu.Lock()
				fullStats.addFrom(stats)
				statsMu.Unlock()
				if out.IsExecutable {
					if err := os.Chmod(path, c.ExecutableMode); err != nil {
						return err
					}
				}
			}
			if eCtx.Err() != nil {
				return eCtx.Err()
			}
			return nil
		})
	}

	LogContextInfof(ctx, log.Level(3), "Waiting for remaining jobs")
	err := eg.Wait()
	LogContextInfof(ctx, log.Level(3), "Done")
	return fullStats, err
}