override def initialAttributes = Attributes.name()

in http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala [429:656]


    override def initialAttributes = Attributes.name("ControllerStage")

    val shape = new BidiShape(requestParsingIn, requestPrepOut, httpResponseIn, responseCtxOut)

    override private[pekko] def createLogicAndMaterializedValue(inheritedAttributes: Attributes,
        outerMaterializer: Materializer) =
      new GraphStageLogic(shape) {
        val parsingErrorHandler: ParsingErrorHandler =
          settings.parsingErrorHandlerInstance(ActorMaterializerHelper.downcast(outerMaterializer).system)
        val pullHttpResponseIn = () => tryPull(httpResponseIn)
        var openRequests = immutable.Queue[RequestStart]()
        var oneHundredContinueResponsePending = false
        var pullSuppressed = false
        var messageEndPending = false

        setHandler(requestParsingIn,
          new InHandler {
            def onPush(): Unit =
              grab(requestParsingIn) match {
                case r: RequestStart =>
                  openRequests = openRequests.enqueue(r)
                  messageEndPending = r.createEntity.isInstanceOf[StreamedEntityCreator[_, _]]
                  val rs = if (r.expect100Continue) {
                    r.createEntity match {
                      case StrictEntityCreator(HttpEntity.Strict(_, _)) =>
                        // This covers two cases:
                        // - Either: The strict entity got all its data send already, so no need to wait for more data
                        // - Or: The strict entity contains no data (Content-Length header value was 0 or it did not exist), the client will not send any data
                        r
                      case _ =>
                        oneHundredContinueResponsePending = true
                        r.copy(createEntity = with100ContinueTrigger(r.createEntity))
                    }
                  } else r
                  push(requestPrepOut, rs)
                case MessageEnd =>
                  messageEndPending = false
                  push(requestPrepOut, MessageEnd)
                case MessageStartError(status, info)                                   => finishWithIllegalRequestError(status, info)
                case x: EntityStreamError if messageEndPending && openRequests.isEmpty =>
                  // client terminated the connection after receiving an early response to 100-continue
                  completeStage()
                case x =>
                  push(requestPrepOut, x)
              }
            override def onUpstreamFinish() =
              if (openRequests.isEmpty) completeStage()
              else complete(requestPrepOut)
          })

        setHandler(requestPrepOut,
          new OutHandler {
            def onPull(): Unit =
              if (oneHundredContinueResponsePending) pullSuppressed = true
              else if (!hasBeenPulled(requestParsingIn)) pull(requestParsingIn)
            override def onDownstreamFinish(): Unit =
              if (openRequests.isEmpty) completeStage()
              else failStage(
                new IllegalStateException("User handler flow was cancelled with ongoing request") with NoStackTrace)
          })

        setHandler(httpResponseIn,
          new InHandler {
            def onPush(): Unit = {
              val requestStart = openRequests.head
              openRequests = openRequests.tail

              val response0 = grab(httpResponseIn)
              val response =
                if (response0.entity.isStrict) response0 // response stream cannot fail
                else response0.mapEntity { e =>
                  val (newEntity, fut) = HttpEntity.captureTermination(e)
                  fut.onComplete {
                    case Failure(ex) =>
                      log.error(ex,
                        s"Response stream for [${requestStart.debugString}] failed with '${ex.getMessage}'. Aborting connection.")
                    case _ => // ignore
                  }(ExecutionContexts.sameThreadExecutionContext)
                  newEntity
                }

              val isEarlyResponse = messageEndPending && openRequests.isEmpty
              if (isEarlyResponse && response.status.isSuccess)
                log.warning(
                  s"Sending an 2xx 'early' response before end of request for ${requestStart.uri} received... " +
                  "Note that the connection will be closed after this response. Also, many clients will not read early responses! " +
                  "Consider only issuing this response after the request data has been completely read!")
              val forceClose =
                (requestStart.expect100Continue && oneHundredContinueResponsePending) ||
                (isClosed(requestParsingIn) && openRequests.isEmpty) ||
                isEarlyResponse

              val close =
                if (forceClose) CloseRequested.ForceClose
                else if (requestStart.closeRequested) CloseRequested.RequestAskedForClosing
                else CloseRequested.Unspecified

              emit(responseCtxOut,
                ResponseRenderingContext(response, requestStart.method, requestStart.protocol, close),
                pullHttpResponseIn)
              if (!isClosed(requestParsingIn) && close.shouldClose && requestStart.expect100Continue)
                maybePullRequestParsingIn()
            }
            override def onUpstreamFinish() =
              if (openRequests.isEmpty && isClosed(requestParsingIn)) completeStage()
              else complete(responseCtxOut)
            override def onUpstreamFailure(ex: Throwable): Unit =
              ex match {
                case EntityStreamException(errorInfo) =>
                  // the application has forwarded a request entity stream error to the response stream
                  finishWithIllegalRequestError(StatusCodes.BadRequest, errorInfo)

                case EntityStreamSizeException(limit, contentLength) =>
                  val summary = contentLength match {
                    case Some(cl) => s"Request Content-Length of $cl bytes exceeds the configured limit of $limit bytes"
                    case None =>
                      s"Aggregated data length of request entity exceeds the configured limit of $limit bytes"
                  }
                  val info =
                    ErrorInfo(summary, "Consider increasing the value of pekko.http.server.parsing.max-content-length")
                  finishWithIllegalRequestError(StatusCodes.PayloadTooLarge, info)

                case IllegalUriException(errorInfo) =>
                  finishWithIllegalRequestError(StatusCodes.BadRequest, errorInfo)

                case ex: ServerTerminationDeadlineReached => failStage(ex)

                case NonFatal(e) =>
                  log.error(e, "Internal server error, sending 500 response")
                  emitErrorResponse(HttpResponse(StatusCodes.InternalServerError))
              }
          })

        setHandler(responseCtxOut,
          new OutHandler {
            override def onPull() = {
              pull(httpResponseIn)
              // after the initial pull here we only ever pull after having emitted in `onPush` of `httpResponseIn`
              setHandler(responseCtxOut, GraphStageLogic.EagerTerminateOutput)
            }
          })

        def finishWithIllegalRequestError(status: StatusCode, info: ErrorInfo): Unit = {
          val errorResponse = JavaMapping.toScala(parsingErrorHandler.handle(status, info, log, settings))
          emitErrorResponse(errorResponse)
        }

        def emitErrorResponse(response: HttpResponse): Unit =
          emit(responseCtxOut, ResponseRenderingContext(response, closeRequested = CloseRequested.ForceClose),
            () => completeStage())

        def maybePullRequestParsingIn(): Unit =
          if (pullSuppressed) {
            pullSuppressed = false
            pull(requestParsingIn)
          }

        /**
         * The `Expect: 100-continue` header has a special status in HTTP.
         * It allows the client to send an `Expect: 100-continue` header with the request and then pause request sending
         * (i.e. hold back sending the request entity). The server reads the request headers, determines whether it wants to
         * accept the request and responds with
         *
         * - `417 Expectation Failed`, if it doesn't support the `100-continue` expectation
         * (or if the `Expect` header contains other, unsupported expectations).
         * - a `100 Continue` response,
         * if it is ready to accept the request entity and the client should go ahead with sending it
         * - a final response (like a 4xx to signal some client-side error
         * (e.g. if the request entity length is beyond the configured limit) or a 3xx redirect)
         *
         * Only if the client receives a `100 Continue` response from the server is it allowed to continue sending the request
         * entity. In this case it will receive another response after having completed request sending.
         * So this special feature breaks the normal "one request - one response" logic of HTTP!
         * It therefore requires special handling in all HTTP stacks (client- and server-side).
         *
         * For us this means:
         *
         * - on the server-side:
         * After having read a `Expect: 100-continue` header with the request we package up an `HttpRequest` instance and send
         * it through to the application. Only when (and if) the application then requests data from the entity stream do we
         * send out a `100 Continue` response and continue reading the request entity.
         * The application can therefore determine itself whether it wants the client to send the request entity
         * by deciding whether to look at the request entity data stream or not.
         * If the application sends a response *without* having looked at the request entity the client receives this
         * response *instead of* the `100 Continue` response and the server closes the connection afterwards.
         *
         * - on the client-side:
         * If the user adds a `Expect: 100-continue` header to the request we need to hold back sending the entity until
         * we've received a `100 Continue` response.
         */
        val emit100ContinueResponse =
          getAsyncCallback[Unit] { _ =>
            oneHundredContinueResponsePending = false
            emit(responseCtxOut, ResponseRenderingContext(HttpResponse(StatusCodes.Continue)))
            maybePullRequestParsingIn()
          }

        case object OneHundredContinueStage extends GraphStage[FlowShape[ParserOutput, ParserOutput]] {
          val in: Inlet[ParserOutput] = Inlet("OneHundredContinueStage.in")
          val out: Outlet[ParserOutput] = Outlet("OneHundredContinueStage.out")
          override val shape: FlowShape[ParserOutput, ParserOutput] = FlowShape(in, out)

          override def initialAttributes = Attributes.name("expect100continueTrigger")

          override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
            new GraphStageLogic(shape) with InHandler with OutHandler {
              private var oneHundredContinueSent = false

              override def onPush(): Unit = push(out, grab(in))
              override def onPull(): Unit = {
                if (!oneHundredContinueSent) {
                  oneHundredContinueSent = true
                  emit100ContinueResponse.invoke(())
                }
                pull(in)
              }

              setHandlers(in, out, this)
            }
        }

        def with100ContinueTrigger[T <: ParserOutput](createEntity: EntityCreator[T, RequestEntity]) =
          StreamedEntityCreator {
            createEntity.compose[Source[T, NotUsed]] {
              _.via(OneHundredContinueStage.asInstanceOf[GraphStage[FlowShape[T, T]]])
            }
          }
      } -> NotUsed