in http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/ServerTerminator.scala [204:378]
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, ServerTerminator) = {
val triggerTermination = Promise[FiniteDuration => Future[HttpTerminated]]() // result here means termination of this connection has completed
// responsible for terminating this connection
val selfTerminator = new ConnectionTerminator(triggerTermination)
val logic = new TimerGraphStageLogic(shape) with StageLogging {
override protected def logSource: Class[_] = classOf[GracefulTerminatorStage]
// this promise will be completed once our termination is complete;
// e.g. we replied with "go away" to pending requests, and no new ones were incoming etc
val terminationOfConnectionDone = Promise[HttpConnectionTerminated]()
// error callback, in case an asynchronous operation needs to pipe back a failure back to this stage
// this could happen during draining of incoming http requests during termination phase for example
lazy val failureCallback: AsyncCallback[Throwable] = getAsyncCallback((ex: Throwable) => failStage(ex))
// true, if a request was delivered to the user-handler, and no response was sent yet
// in that case, if the termination timeout triggers, we will need to render a synthetic response to the client.
var pendingUserHandlerResponse: Boolean = false
override def preStart(): Unit = {
val terminateSignal = getAsyncCallback[FiniteDuration] { deadline =>
log.debug("[terminator] Initializing termination of server, deadline: {}", PrettyDuration.format(deadline))
installTerminationHandlers(deadline.fromNow)
scheduleOnce(TerminationDeadlineTimerKey, deadline)
}
// force initialization of lazy val:
val _ = failureCallback
// this way we expose the termination signal invocation to the external world, in a type safe fashion
triggerTermination.success { d =>
terminateSignal.invoke(d)
terminationOfConnectionDone.future // will be completed once termination has completed (in postStop)
}
}
setHandler(fromUser,
new InHandler {
override def onPush(): Unit = {
val response = grab(fromUser)
pendingUserHandlerResponse = false
push(toNet, response)
}
override def onUpstreamFinish(): Unit = {
// don't finish the whole bidi stage, just propagate the completion:
complete(toNet)
}
})
setHandler(toUser,
new OutHandler {
override def onPull(): Unit = {
pull(fromNet)
}
})
setHandler(fromNet,
new InHandler {
override def onPush(): Unit = {
val request = grab(fromNet)
pendingUserHandlerResponse = true
push(toUser, request)
}
override def onUpstreamFinish(): Unit = {
// don't finish the whole bidi stage, just propagate the completion:
complete(toUser)
}
})
setHandler(toNet,
new OutHandler {
override def onPull(): Unit = {
pull(fromUser)
}
})
def installTerminationHandlers(deadline: Deadline): Unit = {
// when no inflight requests, fail stage right away, could probably be a complete
// when https://github.com/akka/akka-http/issues/3209 is fixed
if (!pendingUserHandlerResponse) failStage(new ServerTerminationDeadlineReached)
setHandler(fromUser,
new InHandler {
override def onPush(): Unit = {
val overdue = deadline.isOverdue()
val response =
if (overdue) {
log.warning("Terminating server ({}), discarding user reply since arrived after deadline expiration",
formatTimeLeft(deadline))
settings.terminationDeadlineExceededResponse
} else grab(fromUser)
pendingUserHandlerResponse = false
// send response to pending in-flight request with Connection: close, and complete stage
emit(toNet,
response.withHeaders(Connection("close") +: response.headers.filterNot(_.is(Connection.lowercaseName))),
() => completeStage())
}
})
// once termination deadline hits, we stop pulling from network
setHandler(toUser,
new OutHandler {
override def onPull(): Unit = {
// if (deadline.hasTimeLeft()) // we pull always as we want to reply errors to everyone
pull(fromNet)
}
})
setHandler(fromNet,
new InHandler {
override def onPush(): Unit = {
val request = grab(fromNet)
log.warning(
"Terminating server ({}), attempting to send termination reply to incoming [{} {}]",
formatTimeLeft(deadline), request.method, request.uri.path)
// on purpose discard all incoming bytes for requests
// could discard with the deadline.timeLeft completion timeout, but not necessarily needed
request.entity.discardBytes()(interpreter.subFusingMaterializer).future.onComplete {
case Success(_) => // ignore
case Failure(ex) =>
// we do want to cause this failure to fail the termination eagerly
failureCallback.invoke(ex)
}(interpreter.materializer.executionContext)
// we can reply right away with an termination response since user handler will never emit a response anymore
push(toNet, settings.terminationDeadlineExceededResponse.withHeaders(Connection("close")))
completeStage()
}
})
// we continue pulling from user, to make sure we'd get the "final user reply" that may be sent during termination
setHandler(toNet,
new OutHandler {
override def onPull(): Unit = {
if (pendingUserHandlerResponse) {
if (isAvailable(fromUser)) pull(fromUser)
}
}
})
}
override def postStop(): Unit = {
terminationOfConnectionDone.success(Http.HttpConnectionTerminated)
}
override protected def onTimer(timerKey: Any): Unit = timerKey match {
case TerminationDeadlineTimerKey =>
val ex = new ServerTerminationDeadlineReached
if (pendingUserHandlerResponse) {
// sending the reply here is a "nice to try", but the stage failure will likely overtake it and terminate the connection first
emit(toNet, settings.terminationDeadlineExceededResponse, () => failStage(ex))
} else {
failStage(ex)
}
case unexpected =>
// should not happen
throw new IllegalArgumentException(s"Unexpected timer key [$unexpected] in ${getClass.getName}!")
}
def formatTimeLeft(d: Deadline): String = {
val left = d.timeLeft
if (left.toMillis < 0) "deadline exceeded"
else PrettyDuration.format(left) + " remaining"
}
}
logic -> selfTerminator
}