func()

in banyand/query/processor_topn.go [465:504]


func (naggr *postNonAggregationProcessor) put(entityValues tsdb.EntityValues, val int64, timestampMillis uint64) error {
	key := entityValues.String()
	if timeline, ok := naggr.timelines[timestampMillis]; ok {
		if timeline.Len() < int(naggr.topN) {
			heap.Push(timeline, &nonAggregatorItem{val: val, key: key, values: entityValues})
		} else {
			if lowest := timeline.Peek(); lowest != nil {
				if naggr.sort == modelv1.Sort_SORT_DESC && lowest.(*nonAggregatorItem).val < val {
					timeline.ReplaceLowest(&nonAggregatorItem{val: val, key: key, values: entityValues})
				} else if naggr.sort != modelv1.Sort_SORT_DESC && lowest.(*nonAggregatorItem).val > val {
					timeline.ReplaceLowest(&nonAggregatorItem{val: val, key: key, values: entityValues})
				}
			}
		}
		return nil
	}

	timeline := flow.NewPriorityQueue(func(a, b interface{}) int {
		if naggr.sort == modelv1.Sort_SORT_DESC {
			if a.(*nonAggregatorItem).val < b.(*nonAggregatorItem).val {
				return -1
			} else if a.(*nonAggregatorItem).val == b.(*nonAggregatorItem).val {
				return 0
			} else {
				return 1
			}
		}
		if a.(*nonAggregatorItem).val < b.(*nonAggregatorItem).val {
			return 1
		} else if a.(*nonAggregatorItem).val == b.(*nonAggregatorItem).val {
			return 0
		} else {
			return -1
		}
	}, false)
	naggr.timelines[timestampMillis] = timeline
	heap.Push(timeline, &nonAggregatorItem{val: val, key: key, values: entityValues})

	return nil
}