func()

in banyand/stream/query_by_idx.go [184:248]


func (qr *idxResult) loadSortingData(ctx context.Context) *model.StreamResult {
	var qo queryOptions
	qo.StreamQueryOptions = qr.qo.StreamQueryOptions
	qo.elementFilter = roaring.NewPostingList()
	qo.seriesToEntity = qr.qo.seriesToEntity
	qr.elementIDsSorted = qr.elementIDsSorted[:0]
	count, searchedSize := 1, 0
	tracer := query.GetTracer(ctx)
	if tracer != nil {
		span, _ := tracer.StartSpan(ctx, "load-sorting-data")
		span.Tagf("max_element_size", "%d", qo.MaxElementSize)
		if qr.qo.elementFilter != nil {
			span.Tag("filter_size", fmt.Sprintf("%d", qr.qo.elementFilter.Len()))
		}
		defer func() {
			span.Tagf("searched_size", "%d", searchedSize)
			span.Tagf("count", "%d", count)
			span.Stop()
		}()
	}
	for ; qr.sortingIter.Next(); count++ {
		searchedSize++
		val := qr.sortingIter.Val()
		if qr.qo.elementFilter != nil && !qr.qo.elementFilter.Contains(val.DocID) {
			count--
			continue
		}
		qo.elementFilter.Insert(val.DocID)
		if val.Timestamp > qo.maxTimestamp {
			qo.maxTimestamp = val.Timestamp
		}
		if val.Timestamp < qo.minTimestamp || qo.minTimestamp == 0 {
			qo.minTimestamp = val.Timestamp
		}
		qr.elementIDsSorted = append(qr.elementIDsSorted, val.DocID)

		// Insertion sort
		insertPos, found := -1, false
		for i, sid := range qo.sortedSids {
			if val.SeriesID == sid {
				found = true
				break
			}
			if val.SeriesID < sid {
				insertPos = i
				break
			}
		}

		if !found {
			if insertPos == -1 {
				qo.sortedSids = append(qo.sortedSids, val.SeriesID)
			} else {
				qo.sortedSids = append(qo.sortedSids[:insertPos], append([]common.SeriesID{val.SeriesID}, qo.sortedSids[insertPos:]...)...)
			}
		}
		if count >= qo.MaxElementSize {
			break
		}
	}
	if qo.elementFilter.IsEmpty() {
		return nil
	}
	return qr.load(ctx, qo)
}