override def createLogicAndMaterializedValue()

in http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/ServerTerminator.scala [204:378]


  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, ServerTerminator) = {
    val triggerTermination = Promise[FiniteDuration => Future[HttpTerminated]]() // result here means termination of this connection has completed

    // responsible for terminating this connection
    val selfTerminator = new ConnectionTerminator(triggerTermination)

    val logic = new TimerGraphStageLogic(shape) with StageLogging {
      override protected def logSource: Class[_] = classOf[GracefulTerminatorStage]

      // this promise will be completed once our termination is complete;
      // e.g. we replied with "go away" to pending requests, and no new ones were incoming etc
      val terminationOfConnectionDone = Promise[HttpConnectionTerminated]()

      // error callback, in case an asynchronous operation needs to pipe back a failure back to this stage
      // this could happen during draining of incoming http requests during termination phase for example
      lazy val failureCallback: AsyncCallback[Throwable] = getAsyncCallback((ex: Throwable) => failStage(ex))

      // true, if a request was delivered to the user-handler, and no response was sent yet
      // in that case, if the termination timeout triggers, we will need to render a synthetic response to the client.
      var pendingUserHandlerResponse: Boolean = false

      override def preStart(): Unit = {
        val terminateSignal = getAsyncCallback[FiniteDuration] { deadline =>
          log.debug("[terminator] Initializing termination of server, deadline: {}", PrettyDuration.format(deadline))
          installTerminationHandlers(deadline.fromNow)

          scheduleOnce(TerminationDeadlineTimerKey, deadline)
        }

        // force initialization of lazy val:
        val _ = failureCallback

        // this way we expose the termination signal invocation to the external world, in a type safe fashion
        triggerTermination.success { d =>
          terminateSignal.invoke(d)
          terminationOfConnectionDone.future // will be completed once termination has completed (in postStop)
        }
      }

      setHandler(fromUser,
        new InHandler {
          override def onPush(): Unit = {
            val response = grab(fromUser)
            pendingUserHandlerResponse = false
            push(toNet, response)
          }

          override def onUpstreamFinish(): Unit = {
            // don't finish the whole bidi stage, just propagate the completion:
            complete(toNet)
          }
        })
      setHandler(toUser,
        new OutHandler {
          override def onPull(): Unit = {
            pull(fromNet)
          }
        })
      setHandler(fromNet,
        new InHandler {
          override def onPush(): Unit = {
            val request = grab(fromNet)

            pendingUserHandlerResponse = true
            push(toUser, request)
          }

          override def onUpstreamFinish(): Unit = {
            // don't finish the whole bidi stage, just propagate the completion:
            complete(toUser)
          }
        })
      setHandler(toNet,
        new OutHandler {
          override def onPull(): Unit = {
            pull(fromUser)
          }
        })

      def installTerminationHandlers(deadline: Deadline): Unit = {
        // when no inflight requests, fail stage right away, could probably be a complete
        // when https://github.com/akka/akka-http/issues/3209 is fixed
        if (!pendingUserHandlerResponse) failStage(new ServerTerminationDeadlineReached)

        setHandler(fromUser,
          new InHandler {
            override def onPush(): Unit = {
              val overdue = deadline.isOverdue()
              val response =
                if (overdue) {
                  log.warning("Terminating server ({}), discarding user reply since arrived after deadline expiration",
                    formatTimeLeft(deadline))
                  settings.terminationDeadlineExceededResponse
                } else grab(fromUser)

              pendingUserHandlerResponse = false

              // send response to pending in-flight request with Connection: close, and complete stage
              emit(toNet,
                response.withHeaders(Connection("close") +: response.headers.filterNot(_.is(Connection.lowercaseName))),
                () => completeStage())
            }
          })

        // once termination deadline hits, we stop pulling from network
        setHandler(toUser,
          new OutHandler {
            override def onPull(): Unit = {
              // if (deadline.hasTimeLeft()) // we pull always as we want to reply errors to everyone
              pull(fromNet)
            }
          })

        setHandler(fromNet,
          new InHandler {
            override def onPush(): Unit = {
              val request = grab(fromNet)
              log.warning(
                "Terminating server ({}), attempting to send termination reply to incoming [{} {}]",
                formatTimeLeft(deadline), request.method, request.uri.path)

              // on purpose discard all incoming bytes for requests
              // could discard with the deadline.timeLeft completion timeout, but not necessarily needed
              request.entity.discardBytes()(interpreter.subFusingMaterializer).future.onComplete {
                case Success(_)  => // ignore
                case Failure(ex) =>
                  // we do want to cause this failure to fail the termination eagerly
                  failureCallback.invoke(ex)
              }(interpreter.materializer.executionContext)

              // we can reply right away with an termination response since user handler will never emit a response anymore
              push(toNet, settings.terminationDeadlineExceededResponse.withHeaders(Connection("close")))
              completeStage()
            }
          })

        // we continue pulling from user, to make sure we'd get the "final user reply" that may be sent during termination
        setHandler(toNet,
          new OutHandler {
            override def onPull(): Unit = {
              if (pendingUserHandlerResponse) {
                if (isAvailable(fromUser)) pull(fromUser)
              }
            }
          })
      }

      override def postStop(): Unit = {
        terminationOfConnectionDone.success(Http.HttpConnectionTerminated)
      }

      override protected def onTimer(timerKey: Any): Unit = timerKey match {
        case TerminationDeadlineTimerKey =>
          val ex = new ServerTerminationDeadlineReached
          if (pendingUserHandlerResponse) {
            // sending the reply here is a "nice to try", but the stage failure will likely overtake it and terminate the connection first
            emit(toNet, settings.terminationDeadlineExceededResponse, () => failStage(ex))
          } else {
            failStage(ex)
          }

        case unexpected =>
          // should not happen
          throw new IllegalArgumentException(s"Unexpected timer key [$unexpected] in ${getClass.getName}!")
      }

      def formatTimeLeft(d: Deadline): String = {
        val left = d.timeLeft
        if (left.toMillis < 0) "deadline exceeded"
        else PrettyDuration.format(left) + " remaining"
      }
    }

    logic -> selfTerminator
  }