func()

in shards/shards.go [217:336]


func (ss *shardedSearcher) Search(ctx context.Context, q query.Q, opts *zoekt.SearchOptions) (sr *zoekt.SearchResult, err error) {
	tr := trace.New("shardedSearcher.Search", "")
	tr.LazyLog(q, true)
	tr.LazyPrintf("opts: %+v", opts)
	overallStart := time.Now()
	metricSearchRunning.Inc()
	defer func() {
		metricSearchRunning.Dec()
		metricSearchDuration.Observe(time.Since(overallStart).Seconds())
		if sr != nil {
			metricSearchContentBytesLoadedTotal.Add(float64(sr.Stats.ContentBytesLoaded))
			metricSearchIndexBytesLoadedTotal.Add(float64(sr.Stats.IndexBytesLoaded))
			metricSearchCrashesTotal.Add(float64(sr.Stats.Crashes))
			metricSearchFileCountTotal.Add(float64(sr.Stats.FileCount))
			metricSearchShardFilesConsideredTotal.Add(float64(sr.Stats.ShardFilesConsidered))
			metricSearchFilesConsideredTotal.Add(float64(sr.Stats.FilesConsidered))
			metricSearchFilesLoadedTotal.Add(float64(sr.Stats.FilesLoaded))
			metricSearchFilesSkippedTotal.Add(float64(sr.Stats.FilesSkipped))
			metricSearchShardsSkippedTotal.Add(float64(sr.Stats.ShardsSkipped))
			metricSearchMatchCountTotal.Add(float64(sr.Stats.MatchCount))
			metricSearchNgramMatchesTotal.Add(float64(sr.Stats.NgramMatches))

			tr.LazyPrintf("num files: %d", len(sr.Files))
			tr.LazyPrintf("stats: %+v", sr.Stats)
		}
		if err != nil {
			metricSearchFailedTotal.Inc()

			tr.LazyPrintf("error: %v", err)
			tr.SetError()
		}
		tr.Finish()
	}()

	start := time.Now()

	aggregate := &zoekt.SearchResult{
		RepoURLs:      map[string]string{},
		LineFragments: map[string]string{},
	}

	// This critical section is large, but we don't want to deal with
	// searches on shards that have just been closed.
	if err := ss.rlock(ctx); err != nil {
		return aggregate, err
	}
	defer ss.runlock()
	tr.LazyPrintf("acquired lock")
	aggregate.Wait = time.Since(start)
	start = time.Now()

	shards := ss.getShards()
	all := make(chan shardResult, len(shards))

	var childCtx context.Context
	var cancel context.CancelFunc
	if opts.MaxWallTime == 0 {
		childCtx, cancel = context.WithCancel(ctx)
	} else {
		childCtx, cancel = context.WithTimeout(ctx, opts.MaxWallTime)
	}

	defer cancel()

	// For each query, throttle the number of parallel
	// actions. Since searching is mostly CPU bound, we limit the
	// number of parallel searches. This reduces the peak working
	// set, which hopefully stops https://cs.bazel.build from crashing
	// when looking for the string "com".
	feeder := make(chan zoekt.Searcher, len(shards))
	for _, s := range shards {
		feeder <- s
	}
	close(feeder)
	for i := 0; i < runtime.GOMAXPROCS(0); i++ {
		go func() {
			for s := range feeder {
				searchOneShard(childCtx, s, q, opts, all)
			}
		}()
	}

	for range shards {
		r := <-all
		if r.err != nil {
			return nil, r.err
		}
		aggregate.Files = append(aggregate.Files, r.sr.Files...)
		aggregate.Stats.Add(r.sr.Stats)

		if len(r.sr.Files) > 0 {
			for k, v := range r.sr.RepoURLs {
				aggregate.RepoURLs[k] = v
			}
			for k, v := range r.sr.LineFragments {
				aggregate.LineFragments[k] = v
			}
		}

		if cancel != nil && opts.TotalMaxMatchCount > 0 && aggregate.Stats.MatchCount > opts.TotalMaxMatchCount {
			cancel()
			cancel = nil
		}
	}

	zoekt.SortFilesByScore(aggregate.Files)
	if max := opts.MaxDocDisplayCount; max > 0 && len(aggregate.Files) > max {
		aggregate.Files = aggregate.Files[:max]
	}
	for i := range aggregate.Files {
		copySlice(&aggregate.Files[i].Content)
		copySlice(&aggregate.Files[i].Checksum)
		for l := range aggregate.Files[i].LineMatches {
			copySlice(&aggregate.Files[i].LineMatches[l].Line)
		}
	}

	aggregate.Duration = time.Since(start)
	return aggregate, nil
}