func()

in go/pkg/client/cas.go [260:379]


func (c *Client) upload(reqs []*uploadRequest) {
	// Collect new uploads.
	newStates := make(map[digest.Digest]*uploadState)
	var newUploads []digest.Digest
	var metas []*ContextMetadata
	log.V(2).Infof("Upload is processing %d requests", len(reqs))
	for _, req := range reqs {
		dg := req.ue.Digest
		st, ok := c.casUploads[dg]
		if ok {
			st.mu.Lock()
			if len(st.clients) > 0 {
				st.clients = append(st.clients, req.wait)
			} else {
				req.wait <- &uploadResponse{err: st.err, missing: false} // Digest is only needed when missing=true
			}
			st.mu.Unlock()
		} else {
			st = &uploadState{
				clients: []chan<- *uploadResponse{req.wait},
				ue:      req.ue,
			}
			c.casUploads[dg] = st
			newUploads = append(newUploads, dg)
			metas = append(metas, req.meta)
			newStates[dg] = st
		}
	}

	unifiedMeta := getUnifiedMetadata(metas)
	var err error
	ctx := context.Background()
	if unifiedMeta.ActionID != "" {
		ctx, err = ContextWithMetadata(context.Background(), unifiedMeta)
	}
	if err != nil {
		for _, st := range newStates {
			updateAndNotify(st, 0, err, false)
		}
		return
	}
	missing, present, err := c.findBlobState(ctx, newUploads)
	if err != nil {
		for _, st := range newStates {
			updateAndNotify(st, 0, err, false)
		}
		return
	}
	for _, dg := range present {
		updateAndNotify(newStates[dg], 0, nil, false)
	}

	LogContextInfof(ctx, log.Level(2), "%d new items to store", len(missing))
	var batches [][]digest.Digest
	if c.useBatchOps {
		batches = c.makeBatches(ctx, missing, true)
	} else {
		LogContextInfof(ctx, log.Level(2), "Uploading them individually")
		for i := range missing {
			LogContextInfof(ctx, log.Level(3), "Creating single batch of blob %s", missing[i])
			batches = append(batches, missing[i:i+1])
		}
	}

	for i, batch := range batches {
		i, batch := i, batch // https://golang.org/doc/faq#closures_and_goroutines
		go func() {
			if c.casUploaders.Acquire(ctx, 1) == nil {
				defer c.casUploaders.Release(1)
			}
			if i%logInterval == 0 {
				LogContextInfof(ctx, log.Level(2), "%d batches left to store", len(batches)-i)
			}
			if len(batch) > 1 {
				LogContextInfof(ctx, log.Level(3), "Uploading batch of %d blobs", len(batch))
				bchMap := make(map[digest.Digest][]byte)
				totalBytesMap := make(map[digest.Digest]int64)
				for _, dg := range batch {
					st := newStates[dg]
					ch, err := chunker.New(st.ue, false, int(c.ChunkMaxSize))
					if err != nil {
						updateAndNotify(st, 0, err, true)
						continue
					}
					data, err := ch.FullData()
					if err != nil {
						updateAndNotify(st, 0, err, true)
						continue
					}
					bchMap[dg] = data
					totalBytesMap[dg] = int64(len(data))
				}
				err := c.BatchWriteBlobs(ctx, bchMap)
				for dg := range bchMap {
					updateAndNotify(newStates[dg], totalBytesMap[dg], err, true)
				}
			} else {
				LogContextInfof(ctx, log.Level(3), "Uploading single blob with digest %s", batch[0])
				st := newStates[batch[0]]
				st.mu.Lock()
				if len(st.clients) == 0 { // Already cancelled.
					log.V(3).Infof("Blob upload for digest %s was canceled", batch[0])
					st.mu.Unlock()
					return
				}
				cCtx, cancel := context.WithCancel(ctx)
				st.cancel = cancel
				st.mu.Unlock()
				dg := st.ue.Digest
				log.V(3).Infof("Uploading single blob with digest %s", batch[0])
				ch, err := chunker.New(st.ue, c.shouldCompress(dg.Size), int(c.ChunkMaxSize))
				if err != nil {
					updateAndNotify(st, 0, err, true)
				}
				totalBytes, err := c.writeChunked(cCtx, c.writeRscName(dg), ch)
				updateAndNotify(st, totalBytes, err, true)
			}
		}()
	}
}