in dax/internal/client/tubepool.go [118:176]
func (p *tubePool) getWithContext(ctx context.Context, highPriority bool, opt RequestOptions) (tube, error) {
for {
p.mutex.Lock()
if p.closed {
p.mutex.Unlock()
return nil, os.ErrClosed
}
// look for idle tubes in stack
if p.top != nil {
t := p.top
p.top = t.Next()
if p.lastActive == t {
p.lastActive = p.top
}
t.SetNext(nil)
p.mutex.Unlock()
return t, nil
}
// no tubes in stack, create wait channel
if p.waiters == nil {
p.waiters = make(chan tube)
}
waitCh := p.waiters
session := p.session
p.mutex.Unlock()
var done chan tube
if p.gate.tryEnter() {
go p.allocAndReleaseGate(session, done, true, opt)
} else if highPriority {
done = make(chan tube)
go p.allocAndReleaseGate(session, done, false, opt)
}
select {
case tube := <-waitCh:
if tube != nil {
return tube, nil
}
// if channel is closed, continue to look for idle tubes in stack
case tube := <-done:
if tube != nil {
return tube, nil
}
case err := <-p.errCh:
// if channel was closed, the error will be nil
if err != nil {
p.debugLog(opt, "TubePool for %s returned error : %s", p.address, err)
return nil, err
}
return nil, os.ErrClosed
case <-ctx.Done():
p.debugLog(opt, "Context.Done is closed in Pool %s. Error : %s", p.address, ctx.Err())
return nil, ctx.Err()
}
}
}