def allowReadingIncomingFrames()

in http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Multiplexer.scala [87:323]


  def allowReadingIncomingFrames(allow: Boolean): Unit

  /** Called by the multiplexer when ready to send a data frame */
  def pullNextFrame(streamId: Int, maxSize: Int): PullFrameResult

  /** Called by the multiplexer after SETTINGS_INITIAL_WINDOW_SIZE has changed */
  def distributeWindowDeltaToAllStreams(delta: Int): Unit

  /** Called by the multiplexer before canceling the stage on outlet cancellation */
  def frameOutFinished(): Unit

  def pushFrameOut(event: FrameEvent): Unit

  def onAllDataFlushed(): Unit

  def createMultiplexer(prioritizer: StreamPrioritizer): Http2Multiplexer with OutHandler =
    new Http2Multiplexer with OutHandler with StateTimingSupport with LogHelper { self =>
      def log: LoggingAdapter = logic.log
      // cache debug state at the beginning to avoid that this has to be queried all the time
      override lazy val isDebugEnabled: Boolean = super.isDebugEnabled

      private var _currentInitialWindow: Int = Http2Protocol.InitialWindowSize
      override def currentInitialWindow: Int = _currentInitialWindow

      private var currentMaxFrameSize: Int = Http2Protocol.InitialMaxFrameSize
      private var connectionWindowLeft: Int = Http2Protocol.InitialWindowSize

      override def pushControlFrame(frame: FrameEvent): Unit = updateState(_.pushControlFrame(frame))

      def updateConnectionLevelWindow(increment: Int): Unit = {
        connectionWindowLeft += increment
        debug(s"Updating outgoing connection window by $increment to $connectionWindowLeft")
        updateState(_.connectionWindowAvailable())
      }
      override def updateMaxFrameSize(newMaxFrameSize: Int): Unit = currentMaxFrameSize = newMaxFrameSize
      override def updateDefaultWindow(newDefaultWindow: Int): Unit = {
        val delta = newDefaultWindow - _currentInitialWindow
        _currentInitialWindow = newDefaultWindow

        distributeWindowDeltaToAllStreams(delta)
      }
      override def updatePriority(info: PriorityFrame): Unit = prioritizer.updatePriority(info)

      def enqueueOutStream(streamId: Int): Unit = updateState(_.enqueueOutStream(streamId))
      def closeStream(streamId: Int): Unit = updateState(_.closeStream(streamId))

      /** Network pulls in new frames */
      def onPull(): Unit = updateState(_.onPull())

      override def onDownstreamFinish(): Unit = {
        frameOutFinished()
        super.onDownstreamFinish()
      }

      private var _state: MultiplexerState = Idle

      def hasFlushedAllData: Boolean = allDataFlushed(_state)
      private def allDataFlushed(state: MultiplexerState): Boolean = (state eq WaitingForData) || (state eq Idle)

      private val controlFrameBuffer: mutable.Queue[FrameEvent] = new mutable.Queue[FrameEvent]
      private val sendableOutstreams: mutable.Queue[Int] = new mutable.Queue[Int]
      private def enqueueStream(streamId: Int): Unit = {
        if (isDebugEnabled)
          require(!sendableOutstreams.contains(streamId), s"Stream [$streamId] was enqueued multiple times.") // requires expensive scanning -> avoid in production
        sendableOutstreams.enqueue(streamId)
      }
      private def dequeueStream(streamId: Int): Unit =
        sendableOutstreams -= streamId

      private def updateState(transition: MultiplexerState => MultiplexerState): Unit = {
        val oldState = _state
        val newState = transition(_state)
        _state = newState

        if (isDebugEnabled && newState.name != oldState.name) recordStateChange(oldState.name, newState.name)
        if (allDataFlushed(newState)) onAllDataFlushed()
        allowReadingIncomingFrames(controlFrameBuffer.size < settings.outgoingControlFrameBufferSize)
      }

      sealed trait MultiplexerState extends Product {
        def name: String = productPrefix

        def onPull(): MultiplexerState
        @nowarn("msg=references private")
        def pushControlFrame(frame: FrameEvent): MultiplexerState
        def connectionWindowAvailable(): MultiplexerState
        def enqueueOutStream(streamId: Int): MultiplexerState
        def closeStream(streamId: Int): MultiplexerState

        protected def sendDataFrame(streamId: Int): MultiplexerState = {
          val maxBytesToSend = currentMaxFrameSize min connectionWindowLeft
          val result = pullNextFrame(streamId, maxBytesToSend)
          def send(frame: DataFrame): Unit = {
            pushFrameOut(frame)
            connectionWindowLeft -= frame.payload.length
          }

          result match {
            case PullFrameResult.SendFrame(frame, hasMore) =>
              send(frame)
              if (hasMore) {
                enqueueStream(streamId)
                WaitingForNetworkToSendData
              } else {
                if (sendableOutstreams.isEmpty) Idle
                else WaitingForNetworkToSendData
              }
            case PullFrameResult.SendFrameAndTrailer(frame, trailer) =>
              send(frame)
              controlFrameBuffer += trailer
              WaitingForNetworkToSendControlFrames
          }
        }
      }

      // Multiplexer state machine
      // Idle: No data to send, no demand from the network (i.e. we were not yet pulled)
      // WaitingForData: Got demand from the network but no data to send
      // WaitingForNetworkToSendControlFrames: Control frames (and maybe data frames) are queued but there is no network demand
      // WaitingForNetworkToSendData: Data frames queued but no network demand
      // WaitingForConnectionWindow: Data frames queued, demand from the network, but no connection-level window available

      case object Idle extends MultiplexerState {
        def onPull(): MultiplexerState = WaitingForData
        def pushControlFrame(frame: FrameEvent): MultiplexerState = {
          controlFrameBuffer += frame
          WaitingForNetworkToSendControlFrames
        }
        def connectionWindowAvailable(): MultiplexerState = this
        def enqueueOutStream(streamId: Int): MultiplexerState = {
          enqueueStream(streamId)
          WaitingForNetworkToSendData
        }
        def closeStream(streamId: Int): MultiplexerState = this
      }

      case object WaitingForData extends MultiplexerState {
        def onPull(): MultiplexerState = throw new IllegalStateException(s"pull unexpected while waiting for data")
        def pushControlFrame(frame: FrameEvent): MultiplexerState = {
          pushFrameOut(frame)
          Idle
        }
        def connectionWindowAvailable(): MultiplexerState = this // nothing to do, as there is no data to send
        def enqueueOutStream(streamId: Int): MultiplexerState =
          if (connectionWindowLeft == 0) {
            enqueueStream(streamId)
            WaitingForConnectionWindow
          } else sendDataFrame(streamId)
        def closeStream(streamId: Int): MultiplexerState = this
      }

      /** Not yet pulled but data waiting to be sent */
      case object WaitingForNetworkToSendControlFrames extends MultiplexerState {
        def onPull(): MultiplexerState = {
          val first = controlFrameBuffer.dequeue()
          pushFrameOut(first)
          if (controlFrameBuffer.isEmpty && sendableOutstreams.isEmpty) Idle
          else if (controlFrameBuffer.isEmpty) WaitingForNetworkToSendData
          else this
        }
        def pushControlFrame(frame: FrameEvent): MultiplexerState = {
          controlFrameBuffer += frame
          this
        }
        def connectionWindowAvailable(): MultiplexerState = this
        def enqueueOutStream(streamId: Int): MultiplexerState = {
          enqueueStream(streamId)
          this
        }

        def closeStream(streamId: Int): MultiplexerState = {
          // expensive operation, but only called for cancelled streams
          dequeueStream(streamId)
          this
        }
      }

      abstract class WithSendableOutStreams extends MultiplexerState {
        protected def sendNext(): MultiplexerState =
          if (prioritizer eq StreamPrioritizer.First)
            sendDataFrame(sendableOutstreams.dequeue())
          else {
            val chosenId = prioritizer.chooseSubstream(sendableOutstreams.toSet)
            // expensive operation, to be optimized when prioritizers can be configured
            dequeueStream(chosenId)
            sendDataFrame(chosenId)
          }

        def closeStream(streamId: Int): MultiplexerState = {
          // expensive operation, but only called for cancelled streams
          dequeueStream(streamId)
          if (sendableOutstreams.nonEmpty) this
          else if (pulled) WaitingForData
          else Idle
        }

        def pulled: Boolean
      }

      case object WaitingForNetworkToSendData extends WithSendableOutStreams {
        def onPull(): MultiplexerState =
          if (connectionWindowLeft > 0) sendNext()
          else // do nothing and wait for window first
            WaitingForConnectionWindow

        def pushControlFrame(frame: FrameEvent): MultiplexerState = {
          controlFrameBuffer += frame
          WaitingForNetworkToSendControlFrames
        }
        def connectionWindowAvailable(): MultiplexerState = this
        def enqueueOutStream(streamId: Int): MultiplexerState = {
          enqueueStream(streamId)
          this
        }

        override def pulled = false
      }

      /** Pulled and data is pending but no connection-level window available */
      case object WaitingForConnectionWindow extends WithSendableOutStreams {
        def onPull(): MultiplexerState =
          throw new IllegalStateException(s"pull unexpected while waiting for connection window")
        def pushControlFrame(frame: FrameEvent): MultiplexerState = {
          pushFrameOut(frame)
          WaitingForNetworkToSendData
        }
        def connectionWindowAvailable(): MultiplexerState = sendNext()
        def enqueueOutStream(streamId: Int): MultiplexerState = {
          enqueueStream(streamId)
          this
        }

        override def pulled = true
      }

      def maxBytesToBufferPerSubstream = 2 * currentMaxFrameSize // for now, let's buffer two frames per substream
    }