in atlas-core/src/main/scala/com/netflix/atlas/core/db/MemoryDatabase.scala [171:225]
private def executeImpl(context: EvalContext, expr: DataExpr): List[TimeSeries] = {
val cfStep = context.step
require(cfStep >= step, "step for query must be >= step for the database")
require(cfStep % step == 0, "consolidated step must be multiple of db step")
val query = TagQuery(Some(expr.query))
val aggr = blockAggr(expr)
val collector = AggregateCollector(expr)
val end = context.end
val multiple = (cfStep / step).asInstanceOf[Int]
val s = context.start / cfStep
val e = end / cfStep
val bs = s * multiple
val be = e * multiple
val stepLength = be - bs + 1
val cfStepLength = stepLength / multiple
val bufStart = bs * step
val bufEnd = bufStart + cfStepLength * cfStep - cfStep
def newBuffer(tags: Map[String, String]): TimeSeriesBuffer = {
TimeSeriesBuffer(tags, cfStep, bufStart, bufEnd)
}
index.findItems(query).foreach { item =>
item.blocks.blockList.foreach { b =>
queryBlocks.increment()
// Check if the block has data for the desired time range
val blockEnd = b.start + (b.size + 1) * step
if (b.start <= be * step && blockEnd >= bs * step) {
aggrBlocks.increment()
collector.add(item.tags, List(b), aggr, expr.cf, multiple, newBuffer)
}
}
}
val stats = collector.stats
queryMetrics.increment(stats.inputLines)
queryLines.increment(stats.outputLines)
queryInputDatapoints.increment(stats.inputDatapoints)
queryOutputDatapoints.increment(stats.outputDatapoints)
val resultKeys = Query.exactKeys(expr.query) ++ expr.finalGrouping
val vs = collector.result
.map { t =>
val resultTags = expr match {
case _: DataExpr.All => t.tags
case _ => t.tags.filter(t => resultKeys.contains(t._1))
}
DataExpr.withDefaultLabel(expr, t.withTags(resultTags))
}
.sortWith { _.label < _.label }
finalValues(context, expr, vs)
}