in lib/lib.go [71:213]
func (obj *Core) Run(ctx context.Context) (interfaces.ResultSet, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel() // can be safely called more than once
iterators := []interfaces.Iterator{} // list of all iterators
iterators = append(iterators, obj.Iterators...)
scanners := make(chan *Scanner) // list of all scanners (one for each iterator)
allResultSets := make(map[string]map[interfaces.Backend]*interfaces.Result)
resultErrors := []error{}
wg := &sync.WaitGroup{}
defer wg.Wait()
wg.Add(1)
go func() { // collect results in parallel so we don't block an iterator
defer wg.Done()
i := -1
for scanner := range scanners { // receive
i++ // counter
if obj.Debug {
obj.Logf("result(%d) wait", i)
}
results, err := scanner.Result() // these contain a wg
if obj.Debug {
obj.Logf("result(%d) done", i)
}
if err != nil {
resultErrors = append(resultErrors, err)
}
// done scanning, so unlock this!
if err := iterators[i].Close(); err != nil {
resultErrors = append(resultErrors, err)
}
for _, m := range results {
for _, result := range m {
// tag (annotate) the result
tagResultIterator(result, iterators[i])
}
}
// inefficient, but fine for now
allResultSets, err = interfaces.MergeResultSets(allResultSets, results)
if err != nil {
resultErrors = append(resultErrors, err)
}
}
}()
obj.Logf("starting with %d iterators...", len(iterators))
obj.Logf("running over %d backends...", len(obj.Backends))
errors := []error{}
once := &sync.Once{}
closeFnDo := func() { close(scanners) }
closeFn := func() { once.Do(closeFnDo) }
defer closeFn()
for i := 0; len(iterators) > i; i++ { // while
x := iterators[i]
defer func() {
// TODO: capture err and return it.
x.Close()
}()
// helper function builder/wrapper to run backend Scan* functions
scanner := &Scanner{
Debug: obj.Debug,
Logf: func(format string, v ...interface{}) {
obj.Logf("scanner: "+format, v...)
},
Backends: obj.Backends,
}
if err := scanner.Init(); err != nil {
return nil, errwrap.Wrapf(err, "scanner init failed")
}
defer scanner.Result() // Wait()
//scanners = append(scanners, scanner)
if obj.Debug {
obj.Logf("running iterator(%d): %s", i, x)
}
if err := x.Validate(); err != nil {
return nil, errwrap.Wrapf(err, "iterator validate failed")
}
// Mechanism to end this long iterator loop early if needed...
// In an effort to short-circuit things if needed, we run a
// check ourselves and break out early if we see that we have
// cancelled early.
select {
case <-ctx.Done():
errors = append(errors, ctx.Err())
break
default:
}
if obj.Debug {
obj.Logf("recurse(%d) start", i)
}
it, err := x.Recurse(ctx, scanner.Scan)
if obj.Debug {
obj.Logf("recurse(%d) done", i)
}
if err != nil {
if obj.ShutdownOnError {
// this will trigger the ctx cancel() in defer
return nil, errwrap.Wrapf(err, "recurse error with: %s", x)
}
errors = append(errors, err)
continue
}
// don't unlock here in case something is running in parallel...
// We wait until *after* recurse has finished running before we
// send the signal on the channel, because once we do, the
// results method of the scanner will be run, which we should
// only do *after* the results are ready.
select {
case scanners <- scanner: // send
case <-ctx.Done():
errors = append(errors, ctx.Err())
break
}
iterators = append(iterators, it...)
}
closeFn() // done sending on channel
//obj.wg.Wait() // wait for everything to finish
wg.Wait() // wait for goroutine to exit
errors = append(errors, resultErrors...) // from the goroutine
if len(errors) > 0 {
var ea error
for _, e := range errors {
ea = errwrap.Append(ea, e)
}
return nil, errwrap.Wrapf(ea, "core run errored")
}
return allResultSets, nil
}