in pkg/genlib/fields/cache.go [57:98]
func (f *Cache) LoadFields(ctx context.Context, integration, stream, version string) (Fields, error) {
var err error
t := tuple{
integration: integration,
stream: stream,
version: version,
}
f.mut.RLock()
flds, ok := f.fields[t]
f.mut.RUnlock()
if ok {
return flds, nil
}
// Limit the number of parallel outbound transactions
if err = f.sema.Acquire(ctx, 1); err != nil {
return nil, err
}
defer f.sema.Release(1)
// Check again after aquiring semaphore; fields may have been retrieved by another thread
f.mut.RLock()
flds, ok = f.fields[t]
f.mut.RUnlock()
if !ok {
if flds, _, err = LoadFields(ctx, f.baseUrl, integration, stream, version); err != nil {
return nil, err
} else {
f.mut.Lock()
f.fields[t] = flds
f.mut.Unlock()
}
}
return flds, nil
}