in banyand/query/processor_topn.go [465:504]
func (naggr *postNonAggregationProcessor) put(entityValues tsdb.EntityValues, val int64, timestampMillis uint64) error {
key := entityValues.String()
if timeline, ok := naggr.timelines[timestampMillis]; ok {
if timeline.Len() < int(naggr.topN) {
heap.Push(timeline, &nonAggregatorItem{val: val, key: key, values: entityValues})
} else {
if lowest := timeline.Peek(); lowest != nil {
if naggr.sort == modelv1.Sort_SORT_DESC && lowest.(*nonAggregatorItem).val < val {
timeline.ReplaceLowest(&nonAggregatorItem{val: val, key: key, values: entityValues})
} else if naggr.sort != modelv1.Sort_SORT_DESC && lowest.(*nonAggregatorItem).val > val {
timeline.ReplaceLowest(&nonAggregatorItem{val: val, key: key, values: entityValues})
}
}
}
return nil
}
timeline := flow.NewPriorityQueue(func(a, b interface{}) int {
if naggr.sort == modelv1.Sort_SORT_DESC {
if a.(*nonAggregatorItem).val < b.(*nonAggregatorItem).val {
return -1
} else if a.(*nonAggregatorItem).val == b.(*nonAggregatorItem).val {
return 0
} else {
return 1
}
}
if a.(*nonAggregatorItem).val < b.(*nonAggregatorItem).val {
return 1
} else if a.(*nonAggregatorItem).val == b.(*nonAggregatorItem).val {
return 0
} else {
return -1
}
}, false)
naggr.timelines[timestampMillis] = timeline
heap.Push(timeline, &nonAggregatorItem{val: val, key: key, values: entityValues})
return nil
}