pkg/genlib/fields/cache.go (76 lines of code) (raw):

package fields import ( "context" "golang.org/x/sync/semaphore" "sync" ) const ( ProductionBaseURL = "https://epr.elastic.co/" maxParallel = 4 ) type tuple struct { integration string stream string version string } type Manifest struct { Title string `config:"title"` Type string `config:"type"` DataSet string `config:"dataset"` } type CacheOption func(*Cache) func WithBaseUrl(url string) CacheOption { return func(c *Cache) { c.baseUrl = url } } type Cache struct { mut sync.RWMutex sema *semaphore.Weighted baseUrl string fields map[tuple]Fields manifest map[tuple]Manifest } func NewCache(opts ...CacheOption) *Cache { c := &Cache{ baseUrl: ProductionBaseURL, sema: semaphore.NewWeighted(maxParallel), fields: make(map[tuple]Fields), manifest: make(map[tuple]Manifest), } for _, opt := range opts { opt(c) } return c } func (f *Cache) LoadFields(ctx context.Context, integration, stream, version string) (Fields, error) { var err error t := tuple{ integration: integration, stream: stream, version: version, } f.mut.RLock() flds, ok := f.fields[t] f.mut.RUnlock() if ok { return flds, nil } // Limit the number of parallel outbound transactions if err = f.sema.Acquire(ctx, 1); err != nil { return nil, err } defer f.sema.Release(1) // Check again after aquiring semaphore; fields may have been retrieved by another thread f.mut.RLock() flds, ok = f.fields[t] f.mut.RUnlock() if !ok { if flds, _, err = LoadFields(ctx, f.baseUrl, integration, stream, version); err != nil { return nil, err } else { f.mut.Lock() f.fields[t] = flds f.mut.Unlock() } } return flds, nil }