bulk_indexer_pool.go (142 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package docappender import ( "context" "fmt" "sync" "sync/atomic" ) // BulkIndexerPool is a pool of BulkIndexer instances. It is designed to be // used in a concurrent environment where multiple goroutines may need to // acquire and release indexers. // // The pool allows a minimum number of BulkIndexers to be guaranteed per ID, a // maximum number of indexers per ID and an overall lease limit. This is useful // to ensure the pool does not grow too large, even if some IDs are slow to // release indexers. type BulkIndexerPool struct { indexers chan *BulkIndexer entries map[string]idEntry mu sync.RWMutex cond *sync.Cond // To wait/signal when a slot is available. leased atomic.Int64 // Total number of leased indexers across all IDs. // Read only fields. min, max, total int64 config BulkIndexerConfig } type idEntry struct { nonEmpty chan *BulkIndexer leased *atomic.Int64 } // NewBulkIndexerPool returns a new BulkIndexerPool with: // - The specified guaranteed indexers per ID // - The maximum number of concurrent BulkIndexers per ID // - A total (max) number of indexers to be leased per pool. // - The BulkIndexerConfig to use when creating new indexers. func NewBulkIndexerPool(guaranteed, max, total int, c BulkIndexerConfig) *BulkIndexerPool { p := BulkIndexerPool{ indexers: make(chan *BulkIndexer, total), entries: make(map[string]idEntry), min: int64(guaranteed), max: int64(max), total: int64(total), config: c, } p.cond = sync.NewCond(&p.mu) return &p } // Get returns a BulkIndexer for the specified ID as the ID is registered and // below the guaranteed minimum OR the local and overall limits. // // If the overall limit of indexers has been reached, it will wait until a slot // is available, blocking execution. // // If Deregister is called while waiting, an error and nil indexer is returned. func (p *BulkIndexerPool) Get(ctx context.Context, id string) (*BulkIndexer, error) { // Acquire the lock to ensure that we are not racing with other goroutines // that may be trying to acquire or release indexers. // Even though this looks like it would prevent other Get() operations from // proceeding, the lock is only held for a short time while we check the // count and overall limits. The lock is released by: // - p.cond.Wait() releases the lock while waiting for a signal. // - p.mu.Unlock() releases the lock after the indexer is returned. p.mu.Lock() defer p.mu.Unlock() for { // Get the entry inside the loop in case it is deregistered while // waiting, or if the ID is not registered. entry, exists := p.entries[id] if !exists { return nil, fmt.Errorf("bulk indexer pool: id %q not registered", id) } // Always allow minimum indexers to be leased, regardless of the // overall limit. This ensures that the minimum number of indexers // are always available for each ID. underGuaranteed := entry.leased.Load() < p.min if underGuaranteed { return p.get(ctx, entry) } // Only allow indexers to be leased if both the local and overall // limits have not been reached. underLocalMax := entry.leased.Load() < p.max underTotal := p.leased.Load() < p.total if underTotal && underLocalMax { return p.get(ctx, entry) } // Waits until Put/Deregister is called. This allows waiting for a // slot to become available without busy waiting. // When Wait() is called, the mutex is unlocked while waiting. // After Wait() returns, the mutex is automatically locked. p.cond.Wait() } } // get returns a BulkIndexer for the specified ID. It is assumed that the // caller has already acquired the lock (read or write) and checked the count // and overall limits. func (p *BulkIndexerPool) get(ctx context.Context, entry idEntry) ( idx *BulkIndexer, err error, ) { defer func() { // Only increment the leased count an indexer is returned. if idx != nil { entry.leased.Add(1) p.leased.Add(1) } }() // First, try to return an existing non-empty indexer (if any). select { case idx = <-entry.nonEmpty: return default: } // If there aren't any non-empty indexers, either return an // existing indexer or create a new one if none are available. select { case idx = <-p.indexers: case <-ctx.Done(): err = ctx.Err() default: idx = newBulkIndexer(p.config) } return } // Put returns the BulkIndexer to the pool. If the indexer is non-empty, it is // stored in the non-empty channel for the ID. Otherwise, it is returned to the // general pool. // After calling Put() no references to the indexer should be stored, since // doing so may lead to undefined behavior and unintended memory sharing. func (p *BulkIndexerPool) Put(id string, indexer *BulkIndexer) { if indexer == nil { return // No indexer to store, nothing to do. } p.mu.RLock() defer p.mu.RUnlock() entry, exists := p.entries[id] if !exists { return // unknown id, discard indexer } defer func() { // Always decrement the count and signal the condition. entry.leased.Add(-1) p.leased.Add(-1) p.cond.Broadcast() // Signal waiting goroutines }() if indexer.Items() > 0 { entry.nonEmpty <- indexer // Never discard non-empty indexers. return } select { case p.indexers <- indexer: // Return to the pool for later reuse. default: indexer = nil // If the pool is full, discard the indexer. } } // Register adds an ID to the pool. If the ID already exists, it does nothing. // This is useful for ensuring that the ID is registered before any indexers // are acquired for it. func (p *BulkIndexerPool) Register(id string) { if id == "" { return // No ID to register. } p.mu.Lock() defer p.mu.Unlock() if _, exists := p.entries[id]; exists { return } p.entries[id] = idEntry{ nonEmpty: make(chan *BulkIndexer, p.max), leased: new(atomic.Int64), } } // Deregister removes the id from the pool and returns a closed BulkIndexer // channel with all the non-empty indexers associated with the ID. func (p *BulkIndexerPool) Deregister(id string) <-chan *BulkIndexer { if id == "" { return nil // No ID to deregister. } p.mu.Lock() defer func() { p.mu.Unlock() p.cond.Broadcast() // Signal ALL waiting goroutines }() entry, ok := p.entries[id] if !ok { // To handle when the ID is not found. entry.nonEmpty = make(chan *BulkIndexer) } delete(p.entries, id) close(entry.nonEmpty) return entry.nonEmpty } func (p *BulkIndexerPool) count(id string) int64 { if id == "" { return -1 } p.mu.RLock() defer p.mu.RUnlock() if entry, exists := p.entries[id]; exists { return entry.leased.Load() } return -1 }