in atlas-pekko/src/main/scala/com/netflix/atlas/pekko/StreamOps.scala [323:378]
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) with InHandler with OutHandler {
import MonitorFlow.*
private var lastUpdate = registry.clock().monotonicTime()
private val numEventsUpdater = numEvents.batchUpdater(MeterBatchSize)
private val upstreamUpdater = upstreamTimer.batchUpdater(MeterBatchSize)
private val downstreamUpdater = downstreamTimer.batchUpdater(MeterBatchSize)
private var upstreamStart = -1L
private var downstreamStart = -1L
override def onPush(): Unit = {
val now = registry.clock().monotonicTime()
numEventsUpdater.increment()
if (upstreamStart != -1L) {
upstreamUpdater.record(now - upstreamStart, TimeUnit.NANOSECONDS)
upstreamStart = -1L
}
push(out, grab(in))
downstreamStart = now
if (now - lastUpdate > MeterUpdateInterval) {
updateMeters(now)
}
}
override def onPull(): Unit = {
val now = registry.clock().monotonicTime()
if (downstreamStart != -1L) {
downstreamUpdater.record(now - downstreamStart, TimeUnit.NANOSECONDS)
downstreamStart = -1L
}
pull(in)
upstreamStart = now
}
override def onUpstreamFinish(): Unit = {
updateMeters(registry.clock().monotonicTime())
numEventsUpdater.close()
upstreamUpdater.close()
downstreamUpdater.close()
super.onUpstreamFinish()
}
private def updateMeters(now: Long): Unit = {
numEventsUpdater.flush()
upstreamUpdater.flush()
downstreamUpdater.flush()
lastUpdate = now
}
setHandlers(in, out, this)
}
}