in http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala [679:823]
override def initialAttributes = Attributes.name("ProtocolSwitchStage")
override val shape = BidiShape(fromHttp, toNet, fromNet, toHttp)
def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
import pekko.http.impl.engine.rendering.ResponseRenderingOutput._
/*
* These handlers are in charge until a switch command comes in, then they
* are replaced.
*/
setHandler(fromHttp,
new InHandler {
override def onPush(): Unit =
grab(fromHttp) match {
case HttpData(b) => push(toNet, b)
case SwitchToOtherProtocol(bytes, handlerFlow) =>
push(toNet, bytes)
complete(toHttp)
cancel(fromHttp)
switchToOtherProtocol(handlerFlow)
}
override def onUpstreamFinish(): Unit = complete(toNet)
override def onUpstreamFailure(ex: Throwable): Unit = fail(toNet, ex)
})
setHandler(toNet,
new OutHandler {
override def onPull(): Unit = pull(fromHttp)
override def onDownstreamFinish(): Unit = completeStage()
})
setHandler(fromNet,
new InHandler {
override def onPush(): Unit = push(toHttp, grab(fromNet))
override def onUpstreamFinish(): Unit = complete(toHttp)
override def onUpstreamFailure(ex: Throwable): Unit = fail(toHttp, ex)
})
setHandler(toHttp,
new OutHandler {
override def onPull(): Unit = pull(fromNet)
override def onDownstreamFinish(): Unit = cancel(fromNet)
})
private var activeTimers = 0
private def timeout = ActorMaterializerHelper.downcast(materializer).settings.subscriptionTimeoutSettings.timeout
private def addTimeout(s: SubscriptionTimeout): Unit = {
if (activeTimers == 0) setKeepGoing(true)
activeTimers += 1
scheduleOnce(s, timeout)
}
private def cancelTimeout(s: SubscriptionTimeout): Unit =
if (isTimerActive(s)) {
activeTimers -= 1
if (activeTimers == 0) setKeepGoing(false)
cancelTimer(s)
}
override def onTimer(timerKey: Any): Unit = timerKey match {
case SubscriptionTimeout(f) =>
activeTimers -= 1
if (activeTimers == 0) setKeepGoing(false)
f()
}
def switchToOtherProtocol(newFlow: Flow[ByteString, ByteString, Any]): Unit = {
val sinkIn = new SubSinkInlet[ByteString]("FrameSink")
sinkIn.setHandler(new InHandler {
override def onPush(): Unit = push(toNet, sinkIn.grab())
override def onUpstreamFinish(): Unit = complete(toNet)
override def onUpstreamFailure(ex: Throwable): Unit = fail(toNet, ex)
})
if (isClosed(fromNet)) {
setHandler(toNet,
new OutHandler {
override def onPull(): Unit = sinkIn.pull()
override def onDownstreamFinish(): Unit = {
completeStage()
sinkIn.cancel()
}
})
newFlow.runWith(Source.empty, sinkIn.sink)(subFusingMaterializer)
} else {
val sourceOut = new SubSourceOutlet[ByteString]("FrameSource")
val timeoutKey = SubscriptionTimeout(() => {
sourceOut.timeout(timeout)
if (sourceOut.isClosed) completeStage()
})
addTimeout(timeoutKey)
setHandler(toNet,
new OutHandler {
override def onPull(): Unit = sinkIn.pull()
override def onDownstreamFinish(): Unit = {
completeStage()
sinkIn.cancel()
sourceOut.complete()
}
})
setHandler(fromNet,
new InHandler {
override def onPush(): Unit = {
if (sourceOut.isAvailable) {
sourceOut.push(grab(fromNet).bytes)
}
}
override def onUpstreamFinish(): Unit = sourceOut.complete()
override def onUpstreamFailure(ex: Throwable): Unit = sourceOut.fail(ex)
})
sourceOut.setHandler(new OutHandler {
override def onPull(): Unit = {
// This check only needed on the first pull due to potential element
// pushed in response to pull by previous source
if (isAvailable(fromNet)) {
sourceOut.push(grab(fromNet).bytes)
} else if (!hasBeenPulled(fromNet)) {
pull(fromNet)
}
cancelTimeout(timeoutKey)
sourceOut.setHandler(new OutHandler {
override def onPull(): Unit = if (!hasBeenPulled(fromNet)) pull(fromNet)
override def onDownstreamFinish(): Unit = cancel(fromNet)
})
}
override def onDownstreamFinish(): Unit = cancel(fromNet)
})
// disable the old handlers, at this point we might still get something due to cancellation delay which we need to ignore
setHandlers(fromHttp, toHttp,
new InHandler with OutHandler {
override def onPush(): Unit = ()
override def onPull(): Unit = ()
override def onUpstreamFinish(): Unit = ()
override def onUpstreamFailure(ex: Throwable): Unit = ()
override def onDownstreamFinish(): Unit = ()
})
newFlow.runWith(sourceOut.source, sinkIn.sink)(subFusingMaterializer)
}
}
}