def wrapTrailingHeaders()

in http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Demux.scala [233:543]


  def wrapTrailingHeaders(headers: ParsedHeadersFrame): Option[HttpEntity.ChunkStreamPart]
  def completionTimeout: FiniteDuration

  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, ServerTerminator) = {
    object Logic extends TimerGraphStageLogic(shape) with Http2MultiplexerSupport with Http2StreamHandling
        with GenericOutletSupport with StageLogging with LogHelper with ServerTerminator {
      logic =>

      import Http2Demux.CompletionTimeout

      def wrapTrailingHeaders(headers: ParsedHeadersFrame): Option[HttpEntity.ChunkStreamPart] =
        stage.wrapTrailingHeaders(headers)

      override def isServer: Boolean = stage.isServer

      override def settings: Http2CommonSettings = http2Settings

      override def isUpgraded: Boolean = upgraded

      override protected def logSource: Class[_] =
        if (isServer) classOf[Http2ServerDemux] else classOf[Http2ClientDemux]

      // cache debug state at the beginning to avoid that this has to be queried all the time
      override lazy val isDebugEnabled: Boolean = super.isDebugEnabled

      private val terminationPromise = Promise[Http.HttpTerminated]()
      private var terminating: Boolean = false
      private var lastIdBeforeTermination: Int = 0
      private val terminateCallback = getAsyncCallback[FiniteDuration](triggerTermination)
      override def terminate(deadline: FiniteDuration)(implicit ex: ExecutionContext): Future[Http.HttpTerminated] = {
        terminateCallback.invoke(deadline)
        terminationPromise.future
      }
      private def triggerTermination(deadline: FiniteDuration): Unit =
        // check if we are already terminating, otherwise start termination
        if (!terminating) {
          log.debug(
            s"Termination of this connection was triggered. Sending GOAWAY and waiting for open requests to complete for $CompletionTimeout.")
          terminating = true
          pushGOAWAY(ErrorCode.NO_ERROR, "Voluntary connection close.")
          lastIdBeforeTermination = lastStreamId()
          completeIfDone()
          if (!isClosed(frameOut))
            scheduleOnce(CompletionTimeout, deadline)
        }

      def frameOutFinished(): Unit = {
        // make sure we clean up/fail substreams with a custom failure before stage is canceled
        // and substream autoclean kicks in
        shutdownStreamHandling()
      }

      override def pushFrameOut(event: FrameEvent): Unit = {
        pingState.onDataFrameSeen()
        push(frameOut, event)
      }

      val multiplexer: Http2Multiplexer with OutHandler = createMultiplexer(StreamPrioritizer.First)
      setHandler(frameOut, multiplexer)

      val pingState = ConfigurablePing.PingState(http2Settings)

      // Send initial settings based on the local application.conf. For simplicity, these settings are
      // enforced immediately even before the acknowledgement is received.
      // Reminder: the receiver of a SETTINGS frame must process them in the order they are received.
      val initialLocalSettings: immutable.Seq[Setting] = immutable.Seq(
        Setting(SettingIdentifier.SETTINGS_MAX_CONCURRENT_STREAMS, http2Settings.maxConcurrentStreams)) ++
        immutable.Seq(Setting(SettingIdentifier.SETTINGS_ENABLE_PUSH, 0)).filter(_ => !isServer) // only on client

      override def preStart(): Unit = {
        if (initialRemoteSettings.nonEmpty) {
          debug(s"Applying ${initialRemoteSettings.length} initial settings!")
          applyRemoteSettings(initialRemoteSettings)
        }

        pullFrameIn()
        // applyRemoteSettings may or may not have pulled `substreamIn` already
        tryPullSubStreams()

        // both client and server must send a settings frame as first frame
        multiplexer.pushControlFrame(SettingsFrame(initialLocalSettings))

        pingState.tickInterval().foreach(interval =>
          // to limit overhead rather than constantly rescheduling a timer and looking at system time we use a constant timer
          schedulePeriodically(ConfigurablePing.Tick, interval))
      }

      override def pushGOAWAY(errorCode: ErrorCode, debug: String): Unit = {
        val frame = GoAwayFrame(lastStreamId(), errorCode, ByteString(debug))
        multiplexer.pushControlFrame(frame)
        // FIXME: handle the connection closing according to the specification
      }
      private[this] var allowReadingIncomingFrames: Boolean = true
      override def allowReadingIncomingFrames(allow: Boolean): Unit = {
        if (allow != allowReadingIncomingFrames)
          if (allow) {
            debug("Resume reading incoming frames")
            if (!hasBeenPulled(frameIn)) pull(frameIn)
          } else debug("Suspended reading incoming frames") // can't retract pending pull but that's ok

        allowReadingIncomingFrames = allow
      }
      def pullFrameIn(): Unit =
        if (allowReadingIncomingFrames && !hasBeenPulled(frameIn) && !isClosed(frameIn)) pull(frameIn)

      def tryPullSubStreams(): Unit = {
        if (!hasBeenPulled(substreamIn) && !isClosed(substreamIn)) {
          // While we don't support PUSH_PROMISE there's only capacity control on the client
          if (isServer) pull(substreamIn)
          else if (hasCapacityToCreateStreams) pull(substreamIn)
        }
      }

      // -----------------------------------------------------------------
      setHandler(frameIn,
        new InHandler {

          def onPush(): Unit = {
            val frame = grab(frameIn)
            frame match {
              case _: PingFrame => // handle later
              case _            => pingState.onDataFrameSeen()
            }
            frame match {
              case WindowUpdateFrame(streamId, increment)
                  if streamId == 0 /* else fall through to StreamFrameEvent */ =>
                multiplexer.updateConnectionLevelWindow(increment)
              case p: PriorityFrame => multiplexer.updatePriority(p)
              case s: StreamFrameEvent =>
                if (!terminating)
                  handleStreamEvent(s)
                else if (s.streamId <= lastIdBeforeTermination)
                  handleStreamEvent(s)
                else
                  // make clear that we are not accepting any more data on other streams
                  multiplexer.pushControlFrame(RstStreamFrame(s.streamId, ErrorCode.REFUSED_STREAM))

              case SettingsFrame(settings) =>
                if (settings.nonEmpty) debug(s"Got ${settings.length} settings!")

                val settingsAppliedOk = applyRemoteSettings(settings)
                if (settingsAppliedOk) {
                  multiplexer.pushControlFrame(SettingsAckFrame(settings))
                }

              case SettingsAckFrame(_) =>
              // Currently, we only expect an ack for the initial settings frame, sent
              // above in preStart. Since only some settings are supported, and those
              // settings are non-modifiable and known at construction time, these settings
              // are enforced from the start of the connection so there's no need to invoke
              // `enforceSettings(initialLocalSettings)`

              case PingFrame(true, data) =>
                if (data != ConfigurablePing.Ping.data) {
                  // We only ever push static data, responding with anything else is wrong
                  pushGOAWAY(ErrorCode.PROTOCOL_ERROR, "Ping ack contained unexpected data")
                } else {
                  pingState.onPingAck()
                }
              case PingFrame(false, data) =>
                multiplexer.pushControlFrame(PingFrame(ack = true, data))

              case e =>
                debug(s"Got unhandled event $e")
              // ignore unknown frames
            }
            pullFrameIn()
          }

          override def onUpstreamFailure(ex: Throwable): Unit = {
            ex match {
              // every IllegalHttp2StreamIdException will be a GOAWAY with PROTOCOL_ERROR
              case e: Http2Compliance.IllegalHttp2StreamIdException =>
                pushGOAWAY(ErrorCode.PROTOCOL_ERROR, e.getMessage)

              case e: Http2Compliance.Http2ProtocolException =>
                pushGOAWAY(e.errorCode, e.getMessage)

              case e: Http2Compliance.Http2ProtocolStreamException =>
                resetStream(e.streamId, e.errorCode)

              case e: ParsingException =>
                e.getCause match {
                  case null  => super.onUpstreamFailure(e) // fail with the raw parsing exception
                  case cause => onUpstreamFailure(cause) // unwrap the cause, which should carry ComplianceException and recurse
                }

              // handle every unhandled exception
              case NonFatal(e) =>
                super.onUpstreamFailure(e)
            }
          }
        })

      // -----------------------------------------------------------------
      // FIXME: What if user handler doesn't pull in new substreams? Should we reject them
      //        after a while or buffer only a limited amount?
      val bufferedSubStreamOutput = new BufferedOutlet[Http2SubStream](fromOutlet(substreamOut))
      override def dispatchSubstream(initialHeaders: ParsedHeadersFrame, data: Either[ByteString, Source[Any, Any]],
          correlationAttributes: Map[AttributeKey[_], _]): Unit =
        bufferedSubStreamOutput.push(Http2SubStream(initialHeaders, OptionVal.None, data, correlationAttributes))

      // -----------------------------------------------------------------
      override def onAllStreamsClosed(): Unit = completeIfDone()
      override def onAllDataFlushed(): Unit = completeIfDone()

      private def completeIfDone(): Unit = {
        val noMoreOutgoingStreams = (terminating || isClosed(substreamIn)) && activeStreamCount() == 0
        def allOutgoingDataFlushed = isClosed(frameOut) || multiplexer.hasFlushedAllData
        if (noMoreOutgoingStreams && allOutgoingDataFlushed) {
          log.debug("Closing connection after all streams are done and all data has been flushed.")
          if (isServer)
            completeStage()
          else {
            cancel(frameIn)
            complete(frameOut)
            cancel(substreamIn)

            // Using complete here (instead of a simpler `completeStage`) will make sure the buffer can be
            // drained by the user before completely shutting down the stage finally.
            // This is only relevant on the client side where `activeStreamCount` can already be 0 because
            // from the HTTP/2 implementation side all streams have been handled, i.e. all responses have been
            // dispatched. However, some of the dispatched responses might still be stuck in the buffer for the user
            // to be fetched. To avoid losing them, we schedule completion here and rely on the final completion of the buffer
            // to terminate the whole stage.
            bufferedSubStreamOutput.complete()
          }
        }
      }

      // -----------------------------------------------------------------
      setHandler(substreamIn,
        new InHandler {
          def onPush(): Unit = {
            val sub = grab(substreamIn)
            handleOutgoingCreated(sub)
            // Once the incoming stream is handled, we decide if we need to pull more.
            tryPullSubStreams()
          }

          override def onUpstreamFinish(): Unit =
            if (isServer) // on the server side conservatively shut everything down if user handler completes prematurely
              super.onUpstreamFinish()
            else { // on the client side allow ongoing responses to be delivered for a while even if requests are done
              completeIfDone()
              scheduleOnce(CompletionTimeout, completionTimeout)
            }
        })

      /**
       * Tune this peer to the remote Settings.
       *
       * @param settings settings sent from the other peer (or injected via the
       *                 "HTTP2-Settings" in "h2c").
       * @return true if settings were applied successfully, false if some ERROR
       *         was raised. When raising an ERROR, this method already pushes the
       *         error back to the peer.
       */
      private def applyRemoteSettings(settings: immutable.Seq[Setting]): Boolean = {
        var settingsAppliedOk = true

        settings.foreach {
          case Setting(Http2Protocol.SettingIdentifier.SETTINGS_INITIAL_WINDOW_SIZE, value) =>
            if (value >= 0) {
              debug(s"Setting initial window to $value")
              multiplexer.updateDefaultWindow(value)
            } else {
              pushGOAWAY(FLOW_CONTROL_ERROR, s"Invalid value for SETTINGS_INITIAL_WINDOW_SIZE: $value")
              settingsAppliedOk = false
            }
          case Setting(Http2Protocol.SettingIdentifier.SETTINGS_MAX_FRAME_SIZE, value) =>
            multiplexer.updateMaxFrameSize(value)
          case Setting(Http2Protocol.SettingIdentifier.SETTINGS_MAX_CONCURRENT_STREAMS, value) =>
            setMaxConcurrentStreams(value)
            // once maxConcurrentStreams is updated, see if we can pull again
            tryPullSubStreams()
          case Setting(id, value) =>
            debug(s"Ignoring setting $id -> $value (in Demux)")
        }
        settingsAppliedOk
      }

      override protected def onTimer(timerKey: Any): Unit = timerKey match {
        case ConfigurablePing.Tick =>
          // don't do anything unless there are active streams
          if (activeStreamCount() > 0) {
            pingState.onTick()
            if (pingState.pingAckOverdue()) {
              pushGOAWAY(ErrorCode.PROTOCOL_ERROR, "Ping ack timeout")
            } else if (pingState.shouldEmitPing()) {
              pingState.sendingPing()
              multiplexer.pushControlFrame(ConfigurablePing.Ping)
            }
          } else {
            pingState.clear()
          }
        case CompletionTimeout =>
          info(
            "Timeout: Peer didn't finish in-flight requests. Closing pending HTTP/2 streams. Increase this timeout via the 'completion-timeout' setting.")

          shutdownStreamHandling()
          completeStage()
      }

      override def postStop(): Unit = {
        shutdownStreamHandling()
        terminationPromise.success(Http.HttpConnectionTerminated)
      }
    }
    (Logic, Logic)
  }