in banyand/stream/query_by_idx.go [184:248]
func (qr *idxResult) loadSortingData(ctx context.Context) *model.StreamResult {
var qo queryOptions
qo.StreamQueryOptions = qr.qo.StreamQueryOptions
qo.elementFilter = roaring.NewPostingList()
qo.seriesToEntity = qr.qo.seriesToEntity
qr.elementIDsSorted = qr.elementIDsSorted[:0]
count, searchedSize := 1, 0
tracer := query.GetTracer(ctx)
if tracer != nil {
span, _ := tracer.StartSpan(ctx, "load-sorting-data")
span.Tagf("max_element_size", "%d", qo.MaxElementSize)
if qr.qo.elementFilter != nil {
span.Tag("filter_size", fmt.Sprintf("%d", qr.qo.elementFilter.Len()))
}
defer func() {
span.Tagf("searched_size", "%d", searchedSize)
span.Tagf("count", "%d", count)
span.Stop()
}()
}
for ; qr.sortingIter.Next(); count++ {
searchedSize++
val := qr.sortingIter.Val()
if qr.qo.elementFilter != nil && !qr.qo.elementFilter.Contains(val.DocID) {
count--
continue
}
qo.elementFilter.Insert(val.DocID)
if val.Timestamp > qo.maxTimestamp {
qo.maxTimestamp = val.Timestamp
}
if val.Timestamp < qo.minTimestamp || qo.minTimestamp == 0 {
qo.minTimestamp = val.Timestamp
}
qr.elementIDsSorted = append(qr.elementIDsSorted, val.DocID)
// Insertion sort
insertPos, found := -1, false
for i, sid := range qo.sortedSids {
if val.SeriesID == sid {
found = true
break
}
if val.SeriesID < sid {
insertPos = i
break
}
}
if !found {
if insertPos == -1 {
qo.sortedSids = append(qo.sortedSids, val.SeriesID)
} else {
qo.sortedSids = append(qo.sortedSids[:insertPos], append([]common.SeriesID{val.SeriesID}, qo.sortedSids[insertPos:]...)...)
}
}
if count >= qo.MaxElementSize {
break
}
}
if qo.elementFilter.IsEmpty() {
return nil
}
return qr.load(ctx, qo)
}