in go/pkg/client/cas.go [260:379]
func (c *Client) upload(reqs []*uploadRequest) {
// Collect new uploads.
newStates := make(map[digest.Digest]*uploadState)
var newUploads []digest.Digest
var metas []*ContextMetadata
log.V(2).Infof("Upload is processing %d requests", len(reqs))
for _, req := range reqs {
dg := req.ue.Digest
st, ok := c.casUploads[dg]
if ok {
st.mu.Lock()
if len(st.clients) > 0 {
st.clients = append(st.clients, req.wait)
} else {
req.wait <- &uploadResponse{err: st.err, missing: false} // Digest is only needed when missing=true
}
st.mu.Unlock()
} else {
st = &uploadState{
clients: []chan<- *uploadResponse{req.wait},
ue: req.ue,
}
c.casUploads[dg] = st
newUploads = append(newUploads, dg)
metas = append(metas, req.meta)
newStates[dg] = st
}
}
unifiedMeta := getUnifiedMetadata(metas)
var err error
ctx := context.Background()
if unifiedMeta.ActionID != "" {
ctx, err = ContextWithMetadata(context.Background(), unifiedMeta)
}
if err != nil {
for _, st := range newStates {
updateAndNotify(st, 0, err, false)
}
return
}
missing, present, err := c.findBlobState(ctx, newUploads)
if err != nil {
for _, st := range newStates {
updateAndNotify(st, 0, err, false)
}
return
}
for _, dg := range present {
updateAndNotify(newStates[dg], 0, nil, false)
}
LogContextInfof(ctx, log.Level(2), "%d new items to store", len(missing))
var batches [][]digest.Digest
if c.useBatchOps {
batches = c.makeBatches(ctx, missing, true)
} else {
LogContextInfof(ctx, log.Level(2), "Uploading them individually")
for i := range missing {
LogContextInfof(ctx, log.Level(3), "Creating single batch of blob %s", missing[i])
batches = append(batches, missing[i:i+1])
}
}
for i, batch := range batches {
i, batch := i, batch // https://golang.org/doc/faq#closures_and_goroutines
go func() {
if c.casUploaders.Acquire(ctx, 1) == nil {
defer c.casUploaders.Release(1)
}
if i%logInterval == 0 {
LogContextInfof(ctx, log.Level(2), "%d batches left to store", len(batches)-i)
}
if len(batch) > 1 {
LogContextInfof(ctx, log.Level(3), "Uploading batch of %d blobs", len(batch))
bchMap := make(map[digest.Digest][]byte)
totalBytesMap := make(map[digest.Digest]int64)
for _, dg := range batch {
st := newStates[dg]
ch, err := chunker.New(st.ue, false, int(c.ChunkMaxSize))
if err != nil {
updateAndNotify(st, 0, err, true)
continue
}
data, err := ch.FullData()
if err != nil {
updateAndNotify(st, 0, err, true)
continue
}
bchMap[dg] = data
totalBytesMap[dg] = int64(len(data))
}
err := c.BatchWriteBlobs(ctx, bchMap)
for dg := range bchMap {
updateAndNotify(newStates[dg], totalBytesMap[dg], err, true)
}
} else {
LogContextInfof(ctx, log.Level(3), "Uploading single blob with digest %s", batch[0])
st := newStates[batch[0]]
st.mu.Lock()
if len(st.clients) == 0 { // Already cancelled.
log.V(3).Infof("Blob upload for digest %s was canceled", batch[0])
st.mu.Unlock()
return
}
cCtx, cancel := context.WithCancel(ctx)
st.cancel = cancel
st.mu.Unlock()
dg := st.ue.Digest
log.V(3).Infof("Uploading single blob with digest %s", batch[0])
ch, err := chunker.New(st.ue, c.shouldCompress(dg.Size), int(c.ChunkMaxSize))
if err != nil {
updateAndNotify(st, 0, err, true)
}
totalBytes, err := c.writeChunked(cCtx, c.writeRscName(dg), ch)
updateAndNotify(st, totalBytes, err, true)
}
}()
}
}