in shards/shards.go [217:336]
func (ss *shardedSearcher) Search(ctx context.Context, q query.Q, opts *zoekt.SearchOptions) (sr *zoekt.SearchResult, err error) {
tr := trace.New("shardedSearcher.Search", "")
tr.LazyLog(q, true)
tr.LazyPrintf("opts: %+v", opts)
overallStart := time.Now()
metricSearchRunning.Inc()
defer func() {
metricSearchRunning.Dec()
metricSearchDuration.Observe(time.Since(overallStart).Seconds())
if sr != nil {
metricSearchContentBytesLoadedTotal.Add(float64(sr.Stats.ContentBytesLoaded))
metricSearchIndexBytesLoadedTotal.Add(float64(sr.Stats.IndexBytesLoaded))
metricSearchCrashesTotal.Add(float64(sr.Stats.Crashes))
metricSearchFileCountTotal.Add(float64(sr.Stats.FileCount))
metricSearchShardFilesConsideredTotal.Add(float64(sr.Stats.ShardFilesConsidered))
metricSearchFilesConsideredTotal.Add(float64(sr.Stats.FilesConsidered))
metricSearchFilesLoadedTotal.Add(float64(sr.Stats.FilesLoaded))
metricSearchFilesSkippedTotal.Add(float64(sr.Stats.FilesSkipped))
metricSearchShardsSkippedTotal.Add(float64(sr.Stats.ShardsSkipped))
metricSearchMatchCountTotal.Add(float64(sr.Stats.MatchCount))
metricSearchNgramMatchesTotal.Add(float64(sr.Stats.NgramMatches))
tr.LazyPrintf("num files: %d", len(sr.Files))
tr.LazyPrintf("stats: %+v", sr.Stats)
}
if err != nil {
metricSearchFailedTotal.Inc()
tr.LazyPrintf("error: %v", err)
tr.SetError()
}
tr.Finish()
}()
start := time.Now()
aggregate := &zoekt.SearchResult{
RepoURLs: map[string]string{},
LineFragments: map[string]string{},
}
// This critical section is large, but we don't want to deal with
// searches on shards that have just been closed.
if err := ss.rlock(ctx); err != nil {
return aggregate, err
}
defer ss.runlock()
tr.LazyPrintf("acquired lock")
aggregate.Wait = time.Since(start)
start = time.Now()
shards := ss.getShards()
all := make(chan shardResult, len(shards))
var childCtx context.Context
var cancel context.CancelFunc
if opts.MaxWallTime == 0 {
childCtx, cancel = context.WithCancel(ctx)
} else {
childCtx, cancel = context.WithTimeout(ctx, opts.MaxWallTime)
}
defer cancel()
// For each query, throttle the number of parallel
// actions. Since searching is mostly CPU bound, we limit the
// number of parallel searches. This reduces the peak working
// set, which hopefully stops https://cs.bazel.build from crashing
// when looking for the string "com".
feeder := make(chan zoekt.Searcher, len(shards))
for _, s := range shards {
feeder <- s
}
close(feeder)
for i := 0; i < runtime.GOMAXPROCS(0); i++ {
go func() {
for s := range feeder {
searchOneShard(childCtx, s, q, opts, all)
}
}()
}
for range shards {
r := <-all
if r.err != nil {
return nil, r.err
}
aggregate.Files = append(aggregate.Files, r.sr.Files...)
aggregate.Stats.Add(r.sr.Stats)
if len(r.sr.Files) > 0 {
for k, v := range r.sr.RepoURLs {
aggregate.RepoURLs[k] = v
}
for k, v := range r.sr.LineFragments {
aggregate.LineFragments[k] = v
}
}
if cancel != nil && opts.TotalMaxMatchCount > 0 && aggregate.Stats.MatchCount > opts.TotalMaxMatchCount {
cancel()
cancel = nil
}
}
zoekt.SortFilesByScore(aggregate.Files)
if max := opts.MaxDocDisplayCount; max > 0 && len(aggregate.Files) > max {
aggregate.Files = aggregate.Files[:max]
}
for i := range aggregate.Files {
copySlice(&aggregate.Files[i].Content)
copySlice(&aggregate.Files[i].Checksum)
for l := range aggregate.Files[i].LineMatches {
copySlice(&aggregate.Files[i].LineMatches[l].Line)
}
}
aggregate.Duration = time.Since(start)
return aggregate, nil
}