in go/pkg/client/cas.go [116:174]
func (c *Client) uploadProcessor() {
var buffer []*uploadRequest
ticker := time.NewTicker(time.Duration(c.UnifiedUploadTickDuration))
for {
select {
case req, ok := <-c.casUploadRequests:
if !ok {
// Client is exiting. Notify remaining uploads to prevent deadlocks.
ticker.Stop()
if buffer != nil {
for _, r := range buffer {
r.wait <- &uploadResponse{err: context.Canceled}
}
}
return
}
if !req.cancel {
buffer = append(buffer, req)
if len(buffer) >= int(c.UnifiedUploadBufferSize) {
c.upload(buffer)
buffer = nil
}
continue
}
// Cancellation request.
var newBuffer []*uploadRequest
for _, r := range buffer {
if r.ue != req.ue || r.wait != req.wait {
newBuffer = append(newBuffer, r)
}
}
buffer = newBuffer
st, ok := c.casUploads[req.ue.Digest]
if ok {
st.mu.Lock()
var remainingClients []chan<- *uploadResponse
for _, w := range st.clients {
if w != req.wait {
remainingClients = append(remainingClients, w)
}
}
st.clients = remainingClients
if len(st.clients) == 0 {
log.V(3).Infof("Cancelling Write %v", req.ue.Digest)
if st.cancel != nil {
st.cancel()
}
delete(c.casUploads, req.ue.Digest)
}
st.mu.Unlock()
}
case <-ticker.C:
if buffer != nil {
c.upload(buffer)
buffer = nil
}
}
}
}