in upload/concurrent/worker.go [45:104]
func (w *Worker) Run(tearDownChan <-chan bool) {
go func() {
defer func() {
// Signal balancer that worker is finished
w.workerFinishedChan <- w
}()
var requestToHandle *Request
var ok bool
for {
select {
case requestToHandle, ok = <-w.RequestsToHandleChan:
if !ok {
// Request channel is closed and drained, worker can try to steal work from others.
//
// Note: load balancer does not play any role in stealing, load balancer closes send-end
// of all worker queue's at the same time, at this point we are sure that no more new job
// will be scheduled. Once we start stealing "Worker::Pending" won't reflect correct load.
requestToHandle = w.tryStealWork()
if requestToHandle == nil {
// Could not steal then return
return
}
}
case <-tearDownChan:
// immediate stop, no need to drain the request channel
return
}
var err error
// Do work, retry on failure.
Loop:
for count := 0; count < maxRetryCount+1; count++ {
select {
case <-tearDownChan:
return
default:
err = requestToHandle.Work() // Run work
if err == nil || !requestToHandle.ShouldRetry(err) {
break Loop
}
}
}
if err != nil {
select {
case w.errorChan <- fmt.Errorf("%s: %v", requestToHandle.ID, err):
case <-tearDownChan:
return
}
}
select {
case w.requestHandledChan <- w: // One work finished (successfully or unsuccessfully)
case <-tearDownChan:
return
}
}
}()
}