in bulk_indexer_pool.go [77:114]
func (p *BulkIndexerPool) Get(ctx context.Context, id string) (*BulkIndexer, error) {
// Acquire the lock to ensure that we are not racing with other goroutines
// that may be trying to acquire or release indexers.
// Even though this looks like it would prevent other Get() operations from
// proceeding, the lock is only held for a short time while we check the
// count and overall limits. The lock is released by:
// - p.cond.Wait() releases the lock while waiting for a signal.
// - p.mu.Unlock() releases the lock after the indexer is returned.
p.mu.Lock()
defer p.mu.Unlock()
for {
// Get the entry inside the loop in case it is deregistered while
// waiting, or if the ID is not registered.
entry, exists := p.entries[id]
if !exists {
return nil, fmt.Errorf("bulk indexer pool: id %q not registered", id)
}
// Always allow minimum indexers to be leased, regardless of the
// overall limit. This ensures that the minimum number of indexers
// are always available for each ID.
underGuaranteed := entry.leased.Load() < p.min
if underGuaranteed {
return p.get(ctx, entry)
}
// Only allow indexers to be leased if both the local and overall
// limits have not been reached.
underLocalMax := entry.leased.Load() < p.max
underTotal := p.leased.Load() < p.total
if underTotal && underLocalMax {
return p.get(ctx, entry)
}
// Waits until Put/Deregister is called. This allows waiting for a
// slot to become available without busy waiting.
// When Wait() is called, the mutex is unlocked while waiting.
// After Wait() returns, the mutex is automatically locked.
p.cond.Wait()
}
}