override def createLogic()

in stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala [1905:2022]


  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new TimerGraphStageLogic(shape) with InHandler with OutHandler {
      import Delay._

      private[this] val size = inheritedAttributes.mandatoryAttribute[InputBuffer].max

      private[this] val delayStrategy = delayStrategySupplier()

      // buffer has pairs of timestamp of expected push and element
      private[this] val buffer = BufferImpl[(Long, T)](size, inheritedAttributes)

      private[this] val onPushWhenBufferFull: () => Unit = overflowStrategy match {
        case EmitEarly =>
          () => {
            if (isAvailable(out)) {
              if (isTimerActive(TimerName)) {
                cancelTimer(TimerName)
              }

              push(out, buffer.dequeue()._2)
              grabAndPull()
              completeIfReady()
            } else {
              throw new IllegalStateException(
                "Was configured to emitEarly and got element when out is not ready and buffer is full, should not be possible.")
            }
          }
        case _: DropHead =>
          () => {
            buffer.dropHead()
            grabAndPull()
          }
        case _: DropTail =>
          () => {
            buffer.dropTail()
            grabAndPull()
          }
        case _: DropNew =>
          () => {
            grab(in)
            if (shouldPull) pull(in)
          }
        case _: DropBuffer =>
          () => {
            buffer.clear()
            grabAndPull()
          }
        case _: Fail =>
          () => {
            failStage(new BufferOverflowException(s"Buffer overflow for delay operator (max capacity was: $size)!"))
          }
        case _: Backpressure =>
          () => {
            throw new IllegalStateException("Delay buffer must never overflow in Backpressure mode")
          }
      }

      def onPush(): Unit = {
        if (buffer.isFull)
          onPushWhenBufferFull()
        else {
          grabAndPull()
          if (!isTimerActive(TimerName)) {
            val waitTime = nextElementWaitTime()
            if (waitTime <= DelayPrecisionMS && isAvailable(out)) {
              push(out, buffer.dequeue()._2)
              completeIfReady()
            } else
              scheduleOnce(TimerName, waitTime.millis)
          }
        }
      }

      private def shouldPull: Boolean =
        buffer.used < size || !overflowStrategy.isBackpressure ||
        // we can only emit early if output is ready
        (overflowStrategy == EmitEarly && isAvailable(out))

      private def grabAndPull(): Unit = {
        val element = grab(in)
        buffer.enqueue((System.nanoTime() + delayStrategy.nextDelay(element).toNanos, element))
        if (shouldPull) pull(in)
      }

      override def onUpstreamFinish(): Unit =
        completeIfReady()

      def onPull(): Unit = {
        if (!isTimerActive(TimerName) && !buffer.isEmpty) {
          val waitTime = nextElementWaitTime()
          if (waitTime <= DelayPrecisionMS)
            push(out, buffer.dequeue()._2)
          else
            scheduleOnce(TimerName, waitTime.millis)
        }

        if (!isClosed(in) && !hasBeenPulled(in) && shouldPull)
          pull(in)

        completeIfReady()
      }

      setHandler(in, this)
      setHandler(out, this)

      def completeIfReady(): Unit = if (isClosed(in) && buffer.isEmpty) completeStage()

      private def nextElementWaitTime(): Long = {
        NANOSECONDS.toMillis(buffer.peek()._1 - System.nanoTime())
      }

      final override protected def onTimer(key: Any): Unit = {
        if (isAvailable(out))
          push(out, buffer.dequeue()._2)

        completeIfReady()
      }
    }