in atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/TimeGrouped.scala [77:219]
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) with InHandler with OutHandler {
private val droppedOldUpdater = droppedOld.batchUpdater(10_000)
private val droppedFutureUpdater = droppedFuture.batchUpdater(10_000)
private val bufferedUpdater = buffered.batchUpdater(10_000)
private val buf = new Array[AggrMap](numBuffers)
buf.indices.foreach { i =>
buf(i) = new AggrMap
}
private val timestamps = new Array[Long](numBuffers)
private var step = -1L
private var cutoffTime = 0L
private var pending: List[TimeGroup] = Nil
private def findBuffer(t: Long): Int = {
var i = 0
var min = 0
while (i < timestamps.length) {
if (timestamps(i) == t) return i
if (i > 0 && timestamps(i) < timestamps(i - 1)) min = i
i += 1
}
-min - 1
}
/**
* Add a value to the aggregate or create a new aggregate initialized to the provided
* value. Heartbeat datapoints will be ignored as they are just used to trigger flushing
* of the time group.
*/
private def aggregate(i: Int, v: AggrDatapoint): Unit = {
if (!v.isHeartbeat) {
val aggr = buf(i).get(v.expr)
if (aggr == null) {
buf(i).put(v.expr, AggrDatapoint.newAggregator(v, aggrSettings))
} else {
aggr.aggregate(v)
}
}
}
/**
* Push the most recently completed time group to the next stage and reset the buffer
* so it can be used for a new time window.
*/
private def flush(i: Int): Option[TimeGroup] = {
droppedOldUpdater.flush()
droppedFutureUpdater.flush()
bufferedUpdater.flush()
val t = timestamps(i)
val group = if (t > 0) Some(toTimeGroup(t, buf(i))) else None
cutoffTime = t
buf(i) = new AggrMap
group
}
private def toTimeGroup(ts: Long, aggrMap: AggrMap): TimeGroup = {
import scala.jdk.CollectionConverters.*
val aggregateMapForExpWithinLimits = aggrMap.asScala
.filter {
case (expr, aggr) if aggr.limitExceeded =>
context.logDatapointsExceeded(ts, expr.toString)
false
case _ =>
true
}
.map {
case (expr, aggr) => expr -> AggrValuesInfo(aggr.datapoints, aggr.numInputDatapoints)
}
.toMap
TimeGroup(ts, step, aggregateMapForExpWithinLimits)
}
override def onPush(): Unit = {
val builder = List.newBuilder[TimeGroup]
val now = clock.wallTime()
val tuple = grab(in)
tuple.data.foreach { v =>
val t = v.timestamp
step = v.step
if (t > now) {
droppedFutureUpdater.increment()
} else if (t <= cutoffTime) {
if (!v.isHeartbeat) {
droppedOldUpdater.increment()
}
} else {
bufferedUpdater.increment()
val i = findBuffer(t)
if (i >= 0) {
aggregate(i, v)
} else {
val pos = -i - 1
builder ++= flush(pos)
aggregate(pos, v)
timestamps(pos) = t
}
}
}
val groups = builder.result()
if (groups.isEmpty && tuple.messages.isEmpty)
pull(in)
else
push(out, TimeGroupsTuple(groups, tuple.messages))
}
override def onPull(): Unit = {
if (isClosed(in))
flushPending()
else
pull(in)
}
override def onUpstreamFinish(): Unit = {
val groups = buf.indices.map { i =>
toTimeGroup(timestamps(i), buf(i))
}.toList
pending = groups.filter(_.timestamp > 0).sortWith(_.timestamp < _.timestamp)
flushPending()
droppedOldUpdater.close()
droppedFutureUpdater.close()
bufferedUpdater.close()
}
private def flushPending(): Unit = {
if (pending.nonEmpty && isAvailable(out)) {
push(out, TimeGroupsTuple(pending))
pending = Nil
}
if (pending.isEmpty) {
completeStage()
}
}
setHandlers(in, out, this)
}
}