in stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala [1905:2022]
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with InHandler with OutHandler {
import Delay._
private[this] val size = inheritedAttributes.mandatoryAttribute[InputBuffer].max
private[this] val delayStrategy = delayStrategySupplier()
// buffer has pairs of timestamp of expected push and element
private[this] val buffer = BufferImpl[(Long, T)](size, inheritedAttributes)
private[this] val onPushWhenBufferFull: () => Unit = overflowStrategy match {
case EmitEarly =>
() => {
if (isAvailable(out)) {
if (isTimerActive(TimerName)) {
cancelTimer(TimerName)
}
push(out, buffer.dequeue()._2)
grabAndPull()
completeIfReady()
} else {
throw new IllegalStateException(
"Was configured to emitEarly and got element when out is not ready and buffer is full, should not be possible.")
}
}
case _: DropHead =>
() => {
buffer.dropHead()
grabAndPull()
}
case _: DropTail =>
() => {
buffer.dropTail()
grabAndPull()
}
case _: DropNew =>
() => {
grab(in)
if (shouldPull) pull(in)
}
case _: DropBuffer =>
() => {
buffer.clear()
grabAndPull()
}
case _: Fail =>
() => {
failStage(new BufferOverflowException(s"Buffer overflow for delay operator (max capacity was: $size)!"))
}
case _: Backpressure =>
() => {
throw new IllegalStateException("Delay buffer must never overflow in Backpressure mode")
}
}
def onPush(): Unit = {
if (buffer.isFull)
onPushWhenBufferFull()
else {
grabAndPull()
if (!isTimerActive(TimerName)) {
val waitTime = nextElementWaitTime()
if (waitTime <= DelayPrecisionMS && isAvailable(out)) {
push(out, buffer.dequeue()._2)
completeIfReady()
} else
scheduleOnce(TimerName, waitTime.millis)
}
}
}
private def shouldPull: Boolean =
buffer.used < size || !overflowStrategy.isBackpressure ||
// we can only emit early if output is ready
(overflowStrategy == EmitEarly && isAvailable(out))
private def grabAndPull(): Unit = {
val element = grab(in)
buffer.enqueue((System.nanoTime() + delayStrategy.nextDelay(element).toNanos, element))
if (shouldPull) pull(in)
}
override def onUpstreamFinish(): Unit =
completeIfReady()
def onPull(): Unit = {
if (!isTimerActive(TimerName) && !buffer.isEmpty) {
val waitTime = nextElementWaitTime()
if (waitTime <= DelayPrecisionMS)
push(out, buffer.dequeue()._2)
else
scheduleOnce(TimerName, waitTime.millis)
}
if (!isClosed(in) && !hasBeenPulled(in) && shouldPull)
pull(in)
completeIfReady()
}
setHandler(in, this)
setHandler(out, this)
def completeIfReady(): Unit = if (isClosed(in) && buffer.isEmpty) completeStage()
private def nextElementWaitTime(): Long = {
NANOSECONDS.toMillis(buffer.peek()._1 - System.nanoTime())
}
final override protected def onTimer(key: Any): Unit = {
if (isAvailable(out))
push(out, buffer.dequeue()._2)
completeIfReady()
}
}