func()

in go/pkg/client/cas.go [116:174]


func (c *Client) uploadProcessor() {
	var buffer []*uploadRequest
	ticker := time.NewTicker(time.Duration(c.UnifiedUploadTickDuration))
	for {
		select {
		case req, ok := <-c.casUploadRequests:
			if !ok {
				// Client is exiting. Notify remaining uploads to prevent deadlocks.
				ticker.Stop()
				if buffer != nil {
					for _, r := range buffer {
						r.wait <- &uploadResponse{err: context.Canceled}
					}
				}
				return
			}
			if !req.cancel {
				buffer = append(buffer, req)
				if len(buffer) >= int(c.UnifiedUploadBufferSize) {
					c.upload(buffer)
					buffer = nil
				}
				continue
			}
			// Cancellation request.
			var newBuffer []*uploadRequest
			for _, r := range buffer {
				if r.ue != req.ue || r.wait != req.wait {
					newBuffer = append(newBuffer, r)
				}
			}
			buffer = newBuffer
			st, ok := c.casUploads[req.ue.Digest]
			if ok {
				st.mu.Lock()
				var remainingClients []chan<- *uploadResponse
				for _, w := range st.clients {
					if w != req.wait {
						remainingClients = append(remainingClients, w)
					}
				}
				st.clients = remainingClients
				if len(st.clients) == 0 {
					log.V(3).Infof("Cancelling Write %v", req.ue.Digest)
					if st.cancel != nil {
						st.cancel()
					}
					delete(c.casUploads, req.ue.Digest)
				}
				st.mu.Unlock()
			}
		case <-ticker.C:
			if buffer != nil {
				c.upload(buffer)
				buffer = nil
			}
		}
	}
}