def add()

in atlas-core/src/main/scala/com/netflix/atlas/core/db/AggregateCollector.scala [99:135]


  def add(
    tags: Map[String, String],
    blocks: List[Block],
    aggr: Int,
    cf: ConsolidationFunction,
    multiple: Int,
    newBuffer: Map[String, String] => TimeSeriesBuffer
  ): Int = {
    if (buffer == null) {
      buffer = newBuffer(tags)
      statBuffer.updateOutput(buffer.values.length)
      if (cf == ConsolidationFunction.Avg && multiple > 1) {
        valueMask = new JBitSet(buffer.values.length * multiple)
        valueMultiple = multiple
      }
    }
    statBuffer.updateInput(blocks)

    val op = aggr match {
      case Block.Sum   => Math.addNaN _
      case Block.Count => Math.addNaN _
      case Block.Min   => Math.minNaN _
      case Block.Max   => Math.maxNaN _
    }

    blocks.foreach { b =>
      if (valueMask != null) {
        val v = buffer.aggrBlock(b, aggr, ConsolidationFunction.Sum, multiple, op)
        buffer.valueMask(valueMask, b, multiple)
        valueCount += v
      } else {
        val v = buffer.aggrBlock(b, aggr, cf, multiple, op)
        valueCount += v
      }
    }
    buffer.values.length
  }