in go/pkg/client/cas.go [1503:1583]
func (c *Client) download(data []*downloadRequest) {
// It is possible to have multiple same files download to different locations.
// This will download once and copy to the other locations.
reqs := make(map[digest.Digest][]*downloadRequest)
var metas []*ContextMetadata
for _, r := range data {
rs := reqs[r.digest]
rs = append(rs, r)
reqs[r.digest] = rs
metas = append(metas, r.meta)
}
var dgs []digest.Digest
if bool(c.useBatchOps) && bool(c.UtilizeLocality) {
paths := make([]*TreeOutput, 0, len(data))
for _, r := range data {
paths = append(paths, r.output)
}
// This is to utilize locality in disk when writing files.
sort.Slice(paths, func(i, j int) bool {
return paths[i].Path < paths[j].Path
})
for _, path := range paths {
dgs = append(dgs, path.Digest)
}
} else {
for dg := range reqs {
dgs = append(dgs, dg)
}
}
unifiedMeta := getUnifiedMetadata(metas)
var err error
ctx := context.Background()
if unifiedMeta.ActionID != "" {
ctx, err = ContextWithMetadata(context.Background(), unifiedMeta)
}
if err != nil {
afterDownload(dgs, reqs, map[digest.Digest]*MovedBytesMetadata{}, err)
return
}
LogContextInfof(ctx, log.Level(2), "%d digests to download (%d reqs)", len(dgs), len(reqs))
var batches [][]digest.Digest
if c.useBatchOps {
batches = c.makeBatches(ctx, dgs, !bool(c.UtilizeLocality))
} else {
LogContextInfof(ctx, log.Level(2), "Downloading them individually")
for i := range dgs {
LogContextInfof(ctx, log.Level(3), "Creating single batch of blob %s", dgs[i])
batches = append(batches, dgs[i:i+1])
}
}
for i, batch := range batches {
i, batch := i, batch // https://golang.org/doc/faq#closures_and_goroutines
go func() {
if c.casDownloaders.Acquire(ctx, 1) == nil {
defer c.casDownloaders.Release(1)
}
if i%logInterval == 0 {
LogContextInfof(ctx, log.Level(2), "%d batches left to download", len(batches)-i)
}
if len(batch) > 1 {
c.downloadBatch(ctx, batch, reqs)
} else {
rs := reqs[batch[0]]
downloadCtx := ctx
if len(rs) == 1 {
// We have only one download request for this digest.
// Download on same context as the issuing request, to support proper cancellation.
downloadCtx = rs[0].context
}
c.downloadSingle(downloadCtx, batch[0], reqs)
}
}()
}
}