func()

in lib/lib.go [71:213]


func (obj *Core) Run(ctx context.Context) (interfaces.ResultSet, error) {
	ctx, cancel := context.WithCancel(ctx)
	defer cancel() // can be safely called more than once

	iterators := []interfaces.Iterator{} // list of all iterators
	iterators = append(iterators, obj.Iterators...)

	scanners := make(chan *Scanner) // list of all scanners (one for each iterator)

	allResultSets := make(map[string]map[interfaces.Backend]*interfaces.Result)
	resultErrors := []error{}

	wg := &sync.WaitGroup{}
	defer wg.Wait()
	wg.Add(1)
	go func() { // collect results in parallel so we don't block an iterator
		defer wg.Done()
		i := -1
		for scanner := range scanners { // receive
			i++ // counter
			if obj.Debug {
				obj.Logf("result(%d) wait", i)
			}
			results, err := scanner.Result() // these contain a wg
			if obj.Debug {
				obj.Logf("result(%d) done", i)
			}
			if err != nil {
				resultErrors = append(resultErrors, err)
			}
			// done scanning, so unlock this!
			if err := iterators[i].Close(); err != nil {
				resultErrors = append(resultErrors, err)
			}

			for _, m := range results {
				for _, result := range m {
					// tag (annotate) the result
					tagResultIterator(result, iterators[i])
				}
			}

			// inefficient, but fine for now
			allResultSets, err = interfaces.MergeResultSets(allResultSets, results)
			if err != nil {
				resultErrors = append(resultErrors, err)
			}
		}
	}()

	obj.Logf("starting with %d iterators...", len(iterators))
	obj.Logf("running over %d backends...", len(obj.Backends))
	errors := []error{}
	once := &sync.Once{}
	closeFnDo := func() { close(scanners) }
	closeFn := func() { once.Do(closeFnDo) }
	defer closeFn()
	for i := 0; len(iterators) > i; i++ { // while
		x := iterators[i]
		defer func() {
			// TODO: capture err and return it.
			x.Close()
		}()

		// helper function builder/wrapper to run backend Scan* functions
		scanner := &Scanner{
			Debug: obj.Debug,
			Logf: func(format string, v ...interface{}) {
				obj.Logf("scanner: "+format, v...)
			},

			Backends: obj.Backends,
		}
		if err := scanner.Init(); err != nil {
			return nil, errwrap.Wrapf(err, "scanner init failed")
		}
		defer scanner.Result() // Wait()
		//scanners = append(scanners, scanner)

		if obj.Debug {
			obj.Logf("running iterator(%d): %s", i, x)
		}
		if err := x.Validate(); err != nil {
			return nil, errwrap.Wrapf(err, "iterator validate failed")
		}

		// Mechanism to end this long iterator loop early if needed...
		// In an effort to short-circuit things if needed, we run a
		// check ourselves and break out early if we see that we have
		// cancelled early.
		select {
		case <-ctx.Done():
			errors = append(errors, ctx.Err())
			break
		default:
		}

		if obj.Debug {
			obj.Logf("recurse(%d) start", i)
		}
		it, err := x.Recurse(ctx, scanner.Scan)
		if obj.Debug {
			obj.Logf("recurse(%d) done", i)
		}
		if err != nil {
			if obj.ShutdownOnError {
				// this will trigger the ctx cancel() in defer
				return nil, errwrap.Wrapf(err, "recurse error with: %s", x)
			}
			errors = append(errors, err)
			continue
		}
		// don't unlock here in case something is running in parallel...

		// We wait until *after* recurse has finished running before we
		// send the signal on the channel, because once we do, the
		// results method of the scanner will be run, which we should
		// only do *after* the results are ready.
		select {
		case scanners <- scanner: // send
		case <-ctx.Done():
			errors = append(errors, ctx.Err())
			break
		}

		iterators = append(iterators, it...)
	}
	closeFn() // done sending on channel
	//obj.wg.Wait() // wait for everything to finish

	wg.Wait()                                // wait for goroutine to exit
	errors = append(errors, resultErrors...) // from the goroutine

	if len(errors) > 0 {
		var ea error
		for _, e := range errors {
			ea = errwrap.Append(ea, e)
		}
		return nil, errwrap.Wrapf(ea, "core run errored")
	}

	return allResultSets, nil
}