func()

in bulk_indexer_pool.go [77:114]


func (p *BulkIndexerPool) Get(ctx context.Context, id string) (*BulkIndexer, error) {
	// Acquire the lock to ensure that we are not racing with other goroutines
	// that may be trying to acquire or release indexers.
	// Even though this looks like it would prevent other Get() operations from
	// proceeding, the lock is only held for a short time while we check the
	// count and overall limits. The lock is released by:
	// - p.cond.Wait() releases the lock while waiting for a signal.
	// - p.mu.Unlock() releases the lock after the indexer is returned.
	p.mu.Lock()
	defer p.mu.Unlock()
	for {
		// Get the entry inside the loop in case it is deregistered while
		// waiting, or if the ID is not registered.
		entry, exists := p.entries[id]
		if !exists {
			return nil, fmt.Errorf("bulk indexer pool: id %q not registered", id)
		}
		// Always allow minimum indexers to be leased, regardless of the
		// overall limit. This ensures that the minimum number of indexers
		// are always available for each ID.
		underGuaranteed := entry.leased.Load() < p.min
		if underGuaranteed {
			return p.get(ctx, entry)
		}
		// Only allow indexers to be leased if both the local and overall
		// limits have not been reached.
		underLocalMax := entry.leased.Load() < p.max
		underTotal := p.leased.Load() < p.total
		if underTotal && underLocalMax {
			return p.get(ctx, entry)
		}
		// Waits until Put/Deregister is called. This allows waiting for a
		// slot to become available without busy waiting.
		// When Wait() is called, the mutex is unlocked while waiting.
		// After Wait() returns, the mutex is automatically locked.
		p.cond.Wait()
	}
}