in http-core/src/main/scala/org/apache/pekko/http/impl/engine/ws/FrameOutHandler.scala [40:199]
override def shape = FlowShape(in, out)
private def closeDeadline(): Deadline = Deadline.now + _closeTimeout
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private def absorbTermination() =
if (isAvailable(out)) getHandler(out).onPull()
private object Idle extends InHandler with ProcotolExceptionHandling {
override def onPush() =
grab(in) match {
case start: FrameStart => push(out, start)
case DirectAnswer(frame) => push(out, frame)
case PeerClosed(code, reason) if !code.exists(Protocol.CloseCodes.isError) =>
// let user complete it, FIXME: maybe make configurable? immediately, or timeout
setHandler(in,
new WaitingForUserHandlerClosed(FrameEvent.closeFrame(code.getOrElse(Protocol.CloseCodes.Regular),
reason)))
pull(in)
case PeerClosed(code, reason) =>
val closeFrame = FrameEvent.closeFrame(code.getOrElse(Protocol.CloseCodes.Regular), reason)
if (serverSide) {
push(out, closeFrame)
completeStage()
} else {
setHandler(in, new WaitingForTransportClose)
push(out, closeFrame)
}
case ActivelyCloseWithCode(code, reason) =>
val closeFrame = FrameEvent.closeFrame(code.getOrElse(Protocol.CloseCodes.Regular), reason)
setHandler(in, new WaitingForPeerCloseFrame())
push(out, closeFrame)
case UserHandlerCompleted =>
setHandler(in, new WaitingForPeerCloseFrame())
push(out, FrameEvent.closeFrame(Protocol.CloseCodes.Regular))
case UserHandlerErredOut(e) =>
log.error(e, s"Websocket handler failed with ${e.getMessage}")
setHandler(in, new WaitingForPeerCloseFrame())
push(out, FrameEvent.closeFrame(Protocol.CloseCodes.UnexpectedCondition, "internal error"))
case Tick => pull(in) // ignore
}
override def onUpstreamFinish(): Unit = {
becomeSendOutCloseFrameAndComplete(FrameEvent.closeFrame(Protocol.CloseCodes.Regular))
absorbTermination()
}
}
/**
* peer has closed, we want to wait for user handler to close as well
*/
private class WaitingForUserHandlerClosed(closeFrame: FrameStart) extends InHandler {
def onPush() =
grab(in) match {
case UserHandlerCompleted => sendOutLastFrame()
case UserHandlerErredOut(e) =>
log.error(e, s"Websocket handler failed while waiting for handler completion with ${e.getMessage}")
sendOutLastFrame()
case start: FrameStart => push(out, start)
case _ => pull(in) // ignore
}
private def sendOutLastFrame(): Unit =
if (serverSide) {
push(out, closeFrame)
completeStage()
} else {
setHandler(in, new WaitingForTransportClose())
push(out, closeFrame)
}
override def onUpstreamFinish(): Unit =
fail(out, new IllegalStateException("Mustn't complete before user has completed"))
}
/**
* we have sent out close frame and wait for peer to sent its close frame
*/
private class WaitingForPeerCloseFrame(deadline: Deadline = closeDeadline()) extends InHandler
with ProcotolExceptionHandling {
override def onPush() =
grab(in) match {
case Tick =>
if (deadline.isOverdue()) {
if (log.isDebugEnabled) log.debug(
s"Peer did not acknowledge CLOSE frame after ${_closeTimeout}, closing underlying connection now.")
completeStage()
} else pull(in)
case PeerClosed(code, reason) =>
if (serverSide) completeStage()
else {
setHandler(in, new WaitingForTransportClose(deadline))
pull(in)
}
case _ => pull(in) // ignore
}
}
/**
* Both side have sent their close frames, server should close the connection first
*/
private class WaitingForTransportClose(deadline: Deadline = closeDeadline()) extends InHandler
with ProcotolExceptionHandling {
override def onPush() = {
grab(in) match {
case Tick =>
if (deadline.isOverdue()) {
if (log.isDebugEnabled) log.debug(
s"Peer did not close TCP connection after sendind CLOSE frame after ${_closeTimeout}, closing underlying connection now.")
completeStage()
} else pull(in)
case _ => pull(in) // ignore
}
}
}
/** If upstream has already failed we just wait to be able to deliver our close frame and complete */
private class SendOutCloseFrameAndComplete(closeFrame: FrameStart) extends InHandler with OutHandler
with ProcotolExceptionHandling {
override def onPush() =
fail(out, new IllegalStateException("Didn't expect push after completion"))
override def onPull(): Unit = {
push(out, closeFrame)
completeStage()
}
override def onUpstreamFinish(): Unit =
absorbTermination()
}
def becomeSendOutCloseFrameAndComplete(frameStart: FrameStart): Unit = {
val inNOutHandler = new SendOutCloseFrameAndComplete(frameStart)
setHandler(in, inNOutHandler)
setHandler(out, inNOutHandler)
}
/** We handle [[ProtocolException]] in a special way (by terminating with a ProtocolError) */
private trait ProcotolExceptionHandling extends InHandler {
@tailrec override final def onUpstreamFailure(cause: Throwable): Unit =
cause match {
case p: ProtocolException =>
becomeSendOutCloseFrameAndComplete(FrameEvent.closeFrame(Protocol.CloseCodes.ProtocolError))
absorbTermination()
case x if x.getCause ne null => onUpstreamFailure(x.getCause)
case _ =>
super.onUpstreamFailure(cause)
}
}
// init handlers
setHandler(in, Idle)
setHandler(out,
new OutHandler {
override def onPull(): Unit = pull(in)
})
}