in atlas-core/src/main/scala/com/netflix/atlas/core/db/AggregateCollector.scala [99:135]
def add(
tags: Map[String, String],
blocks: List[Block],
aggr: Int,
cf: ConsolidationFunction,
multiple: Int,
newBuffer: Map[String, String] => TimeSeriesBuffer
): Int = {
if (buffer == null) {
buffer = newBuffer(tags)
statBuffer.updateOutput(buffer.values.length)
if (cf == ConsolidationFunction.Avg && multiple > 1) {
valueMask = new JBitSet(buffer.values.length * multiple)
valueMultiple = multiple
}
}
statBuffer.updateInput(blocks)
val op = aggr match {
case Block.Sum => Math.addNaN _
case Block.Count => Math.addNaN _
case Block.Min => Math.minNaN _
case Block.Max => Math.maxNaN _
}
blocks.foreach { b =>
if (valueMask != null) {
val v = buffer.aggrBlock(b, aggr, ConsolidationFunction.Sum, multiple, op)
buffer.valueMask(valueMask, b, multiple)
valueCount += v
} else {
val v = buffer.aggrBlock(b, aggr, cf, multiple, op)
valueCount += v
}
}
buffer.values.length
}