func()

in dax/internal/client/tubepool.go [118:176]


func (p *tubePool) getWithContext(ctx context.Context, highPriority bool, opt RequestOptions) (tube, error) {
	for {
		p.mutex.Lock()
		if p.closed {
			p.mutex.Unlock()
			return nil, os.ErrClosed
		}

		// look for idle tubes in stack
		if p.top != nil {
			t := p.top
			p.top = t.Next()
			if p.lastActive == t {
				p.lastActive = p.top
			}
			t.SetNext(nil)
			p.mutex.Unlock()
			return t, nil
		}

		// no tubes in stack, create wait channel
		if p.waiters == nil {
			p.waiters = make(chan tube)
		}
		waitCh := p.waiters
		session := p.session
		p.mutex.Unlock()

		var done chan tube
		if p.gate.tryEnter() {
			go p.allocAndReleaseGate(session, done, true, opt)
		} else if highPriority {
			done = make(chan tube)
			go p.allocAndReleaseGate(session, done, false, opt)
		}

		select {
		case tube := <-waitCh:
			if tube != nil {
				return tube, nil
			}
			// if channel is closed, continue to look for idle tubes in stack
		case tube := <-done:
			if tube != nil {
				return tube, nil
			}
		case err := <-p.errCh:
			// if channel was closed, the error will be nil
			if err != nil {
				p.debugLog(opt, "TubePool for %s returned error : %s", p.address, err)
				return nil, err
			}
			return nil, os.ErrClosed
		case <-ctx.Done():
			p.debugLog(opt, "Context.Done is closed in Pool %s. Error : %s", p.address, ctx.Err())
			return nil, ctx.Err()
		}
	}
}