in http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Demux.scala [233:543]
def wrapTrailingHeaders(headers: ParsedHeadersFrame): Option[HttpEntity.ChunkStreamPart]
def completionTimeout: FiniteDuration
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, ServerTerminator) = {
object Logic extends TimerGraphStageLogic(shape) with Http2MultiplexerSupport with Http2StreamHandling
with GenericOutletSupport with StageLogging with LogHelper with ServerTerminator {
logic =>
import Http2Demux.CompletionTimeout
def wrapTrailingHeaders(headers: ParsedHeadersFrame): Option[HttpEntity.ChunkStreamPart] =
stage.wrapTrailingHeaders(headers)
override def isServer: Boolean = stage.isServer
override def settings: Http2CommonSettings = http2Settings
override def isUpgraded: Boolean = upgraded
override protected def logSource: Class[_] =
if (isServer) classOf[Http2ServerDemux] else classOf[Http2ClientDemux]
// cache debug state at the beginning to avoid that this has to be queried all the time
override lazy val isDebugEnabled: Boolean = super.isDebugEnabled
private val terminationPromise = Promise[Http.HttpTerminated]()
private var terminating: Boolean = false
private var lastIdBeforeTermination: Int = 0
private val terminateCallback = getAsyncCallback[FiniteDuration](triggerTermination)
override def terminate(deadline: FiniteDuration)(implicit ex: ExecutionContext): Future[Http.HttpTerminated] = {
terminateCallback.invoke(deadline)
terminationPromise.future
}
private def triggerTermination(deadline: FiniteDuration): Unit =
// check if we are already terminating, otherwise start termination
if (!terminating) {
log.debug(
s"Termination of this connection was triggered. Sending GOAWAY and waiting for open requests to complete for $CompletionTimeout.")
terminating = true
pushGOAWAY(ErrorCode.NO_ERROR, "Voluntary connection close.")
lastIdBeforeTermination = lastStreamId()
completeIfDone()
if (!isClosed(frameOut))
scheduleOnce(CompletionTimeout, deadline)
}
def frameOutFinished(): Unit = {
// make sure we clean up/fail substreams with a custom failure before stage is canceled
// and substream autoclean kicks in
shutdownStreamHandling()
}
override def pushFrameOut(event: FrameEvent): Unit = {
pingState.onDataFrameSeen()
push(frameOut, event)
}
val multiplexer: Http2Multiplexer with OutHandler = createMultiplexer(StreamPrioritizer.First)
setHandler(frameOut, multiplexer)
val pingState = ConfigurablePing.PingState(http2Settings)
// Send initial settings based on the local application.conf. For simplicity, these settings are
// enforced immediately even before the acknowledgement is received.
// Reminder: the receiver of a SETTINGS frame must process them in the order they are received.
val initialLocalSettings: immutable.Seq[Setting] = immutable.Seq(
Setting(SettingIdentifier.SETTINGS_MAX_CONCURRENT_STREAMS, http2Settings.maxConcurrentStreams)) ++
immutable.Seq(Setting(SettingIdentifier.SETTINGS_ENABLE_PUSH, 0)).filter(_ => !isServer) // only on client
override def preStart(): Unit = {
if (initialRemoteSettings.nonEmpty) {
debug(s"Applying ${initialRemoteSettings.length} initial settings!")
applyRemoteSettings(initialRemoteSettings)
}
pullFrameIn()
// applyRemoteSettings may or may not have pulled `substreamIn` already
tryPullSubStreams()
// both client and server must send a settings frame as first frame
multiplexer.pushControlFrame(SettingsFrame(initialLocalSettings))
pingState.tickInterval().foreach(interval =>
// to limit overhead rather than constantly rescheduling a timer and looking at system time we use a constant timer
schedulePeriodically(ConfigurablePing.Tick, interval))
}
override def pushGOAWAY(errorCode: ErrorCode, debug: String): Unit = {
val frame = GoAwayFrame(lastStreamId(), errorCode, ByteString(debug))
multiplexer.pushControlFrame(frame)
// FIXME: handle the connection closing according to the specification
}
private[this] var allowReadingIncomingFrames: Boolean = true
override def allowReadingIncomingFrames(allow: Boolean): Unit = {
if (allow != allowReadingIncomingFrames)
if (allow) {
debug("Resume reading incoming frames")
if (!hasBeenPulled(frameIn)) pull(frameIn)
} else debug("Suspended reading incoming frames") // can't retract pending pull but that's ok
allowReadingIncomingFrames = allow
}
def pullFrameIn(): Unit =
if (allowReadingIncomingFrames && !hasBeenPulled(frameIn) && !isClosed(frameIn)) pull(frameIn)
def tryPullSubStreams(): Unit = {
if (!hasBeenPulled(substreamIn) && !isClosed(substreamIn)) {
// While we don't support PUSH_PROMISE there's only capacity control on the client
if (isServer) pull(substreamIn)
else if (hasCapacityToCreateStreams) pull(substreamIn)
}
}
// -----------------------------------------------------------------
setHandler(frameIn,
new InHandler {
def onPush(): Unit = {
val frame = grab(frameIn)
frame match {
case _: PingFrame => // handle later
case _ => pingState.onDataFrameSeen()
}
frame match {
case WindowUpdateFrame(streamId, increment)
if streamId == 0 /* else fall through to StreamFrameEvent */ =>
multiplexer.updateConnectionLevelWindow(increment)
case p: PriorityFrame => multiplexer.updatePriority(p)
case s: StreamFrameEvent =>
if (!terminating)
handleStreamEvent(s)
else if (s.streamId <= lastIdBeforeTermination)
handleStreamEvent(s)
else
// make clear that we are not accepting any more data on other streams
multiplexer.pushControlFrame(RstStreamFrame(s.streamId, ErrorCode.REFUSED_STREAM))
case SettingsFrame(settings) =>
if (settings.nonEmpty) debug(s"Got ${settings.length} settings!")
val settingsAppliedOk = applyRemoteSettings(settings)
if (settingsAppliedOk) {
multiplexer.pushControlFrame(SettingsAckFrame(settings))
}
case SettingsAckFrame(_) =>
// Currently, we only expect an ack for the initial settings frame, sent
// above in preStart. Since only some settings are supported, and those
// settings are non-modifiable and known at construction time, these settings
// are enforced from the start of the connection so there's no need to invoke
// `enforceSettings(initialLocalSettings)`
case PingFrame(true, data) =>
if (data != ConfigurablePing.Ping.data) {
// We only ever push static data, responding with anything else is wrong
pushGOAWAY(ErrorCode.PROTOCOL_ERROR, "Ping ack contained unexpected data")
} else {
pingState.onPingAck()
}
case PingFrame(false, data) =>
multiplexer.pushControlFrame(PingFrame(ack = true, data))
case e =>
debug(s"Got unhandled event $e")
// ignore unknown frames
}
pullFrameIn()
}
override def onUpstreamFailure(ex: Throwable): Unit = {
ex match {
// every IllegalHttp2StreamIdException will be a GOAWAY with PROTOCOL_ERROR
case e: Http2Compliance.IllegalHttp2StreamIdException =>
pushGOAWAY(ErrorCode.PROTOCOL_ERROR, e.getMessage)
case e: Http2Compliance.Http2ProtocolException =>
pushGOAWAY(e.errorCode, e.getMessage)
case e: Http2Compliance.Http2ProtocolStreamException =>
resetStream(e.streamId, e.errorCode)
case e: ParsingException =>
e.getCause match {
case null => super.onUpstreamFailure(e) // fail with the raw parsing exception
case cause => onUpstreamFailure(cause) // unwrap the cause, which should carry ComplianceException and recurse
}
// handle every unhandled exception
case NonFatal(e) =>
super.onUpstreamFailure(e)
}
}
})
// -----------------------------------------------------------------
// FIXME: What if user handler doesn't pull in new substreams? Should we reject them
// after a while or buffer only a limited amount?
val bufferedSubStreamOutput = new BufferedOutlet[Http2SubStream](fromOutlet(substreamOut))
override def dispatchSubstream(initialHeaders: ParsedHeadersFrame, data: Either[ByteString, Source[Any, Any]],
correlationAttributes: Map[AttributeKey[_], _]): Unit =
bufferedSubStreamOutput.push(Http2SubStream(initialHeaders, OptionVal.None, data, correlationAttributes))
// -----------------------------------------------------------------
override def onAllStreamsClosed(): Unit = completeIfDone()
override def onAllDataFlushed(): Unit = completeIfDone()
private def completeIfDone(): Unit = {
val noMoreOutgoingStreams = (terminating || isClosed(substreamIn)) && activeStreamCount() == 0
def allOutgoingDataFlushed = isClosed(frameOut) || multiplexer.hasFlushedAllData
if (noMoreOutgoingStreams && allOutgoingDataFlushed) {
log.debug("Closing connection after all streams are done and all data has been flushed.")
if (isServer)
completeStage()
else {
cancel(frameIn)
complete(frameOut)
cancel(substreamIn)
// Using complete here (instead of a simpler `completeStage`) will make sure the buffer can be
// drained by the user before completely shutting down the stage finally.
// This is only relevant on the client side where `activeStreamCount` can already be 0 because
// from the HTTP/2 implementation side all streams have been handled, i.e. all responses have been
// dispatched. However, some of the dispatched responses might still be stuck in the buffer for the user
// to be fetched. To avoid losing them, we schedule completion here and rely on the final completion of the buffer
// to terminate the whole stage.
bufferedSubStreamOutput.complete()
}
}
}
// -----------------------------------------------------------------
setHandler(substreamIn,
new InHandler {
def onPush(): Unit = {
val sub = grab(substreamIn)
handleOutgoingCreated(sub)
// Once the incoming stream is handled, we decide if we need to pull more.
tryPullSubStreams()
}
override def onUpstreamFinish(): Unit =
if (isServer) // on the server side conservatively shut everything down if user handler completes prematurely
super.onUpstreamFinish()
else { // on the client side allow ongoing responses to be delivered for a while even if requests are done
completeIfDone()
scheduleOnce(CompletionTimeout, completionTimeout)
}
})
/**
* Tune this peer to the remote Settings.
*
* @param settings settings sent from the other peer (or injected via the
* "HTTP2-Settings" in "h2c").
* @return true if settings were applied successfully, false if some ERROR
* was raised. When raising an ERROR, this method already pushes the
* error back to the peer.
*/
private def applyRemoteSettings(settings: immutable.Seq[Setting]): Boolean = {
var settingsAppliedOk = true
settings.foreach {
case Setting(Http2Protocol.SettingIdentifier.SETTINGS_INITIAL_WINDOW_SIZE, value) =>
if (value >= 0) {
debug(s"Setting initial window to $value")
multiplexer.updateDefaultWindow(value)
} else {
pushGOAWAY(FLOW_CONTROL_ERROR, s"Invalid value for SETTINGS_INITIAL_WINDOW_SIZE: $value")
settingsAppliedOk = false
}
case Setting(Http2Protocol.SettingIdentifier.SETTINGS_MAX_FRAME_SIZE, value) =>
multiplexer.updateMaxFrameSize(value)
case Setting(Http2Protocol.SettingIdentifier.SETTINGS_MAX_CONCURRENT_STREAMS, value) =>
setMaxConcurrentStreams(value)
// once maxConcurrentStreams is updated, see if we can pull again
tryPullSubStreams()
case Setting(id, value) =>
debug(s"Ignoring setting $id -> $value (in Demux)")
}
settingsAppliedOk
}
override protected def onTimer(timerKey: Any): Unit = timerKey match {
case ConfigurablePing.Tick =>
// don't do anything unless there are active streams
if (activeStreamCount() > 0) {
pingState.onTick()
if (pingState.pingAckOverdue()) {
pushGOAWAY(ErrorCode.PROTOCOL_ERROR, "Ping ack timeout")
} else if (pingState.shouldEmitPing()) {
pingState.sendingPing()
multiplexer.pushControlFrame(ConfigurablePing.Ping)
}
} else {
pingState.clear()
}
case CompletionTimeout =>
info(
"Timeout: Peer didn't finish in-flight requests. Closing pending HTTP/2 streams. Increase this timeout via the 'completion-timeout' setting.")
shutdownStreamHandling()
completeStage()
}
override def postStop(): Unit = {
shutdownStreamHandling()
terminationPromise.success(Http.HttpConnectionTerminated)
}
}
(Logic, Logic)
}