def renderer: Flow[ResponseRenderingContext, ResponseRenderingOutput, NotUsed] = Flow.fromGraph()

in http-core/src/main/scala/org/apache/pekko/http/impl/engine/rendering/HttpResponseRendererFactory.scala [57:323]


  def renderer: Flow[ResponseRenderingContext, ResponseRenderingOutput, NotUsed] = Flow.fromGraph(HttpResponseRenderer)

  object HttpResponseRenderer extends GraphStage[FlowShape[ResponseRenderingContext, ResponseRenderingOutput]] {
    val in = Inlet[ResponseRenderingContext]("HttpResponseRenderer.in")
    val out = Outlet[ResponseRenderingOutput]("HttpResponseRenderer.out")
    val shape: FlowShape[ResponseRenderingContext, ResponseRenderingOutput] = FlowShape(in, out)

    def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
      new GraphStageLogic(shape) {
        var closeMode: CloseMode = DontClose // signals what to do after the current response
        def close: Boolean = closeMode != DontClose
        def closeIf(cond: Boolean): Unit = if (cond) closeMode = CloseConnection
        var transferSink: Option[SubSinkInlet[ByteString]] = None
        def transferring: Boolean = transferSink.isDefined

        setHandler(in,
          new InHandler {
            override def onPush(): Unit =
              render(grab(in)) match {
                case Strict(outElement) =>
                  push(out, outElement)
                  if (close) completeStage()
                case HeadersAndStreamedEntity(headerData, outStream) =>
                  try transfer(headerData, outStream)
                  catch {
                    case NonFatal(e) =>
                      log.error(e,
                        s"Rendering of response failed because response entity stream materialization failed with '${e.getMessage}'. Sending out 500 response instead.")
                      push(out,
                        render(ResponseRenderingContext(HttpResponse(500,
                          entity = StatusCodes.InternalServerError.defaultMessage))).asInstanceOf[Strict].bytes)
                  }
              }

            override def onUpstreamFinish(): Unit =
              if (transferring) closeMode = CloseConnection
              else completeStage()

            override def onUpstreamFailure(ex: Throwable): Unit = {
              stopTransfer()
              failStage(ex)
            }
          })
        private val waitForDemandHandler = new OutHandler {
          def onPull(): Unit = if (!hasBeenPulled(in)) tryPull(in)
        }
        def stopTransfer(): Unit = {
          setHandler(out, waitForDemandHandler)
          if (isAvailable(out) && !hasBeenPulled(in)) tryPull(in)
          transferSink.foreach(_.cancel())
          transferSink = None
        }

        setHandler(out, waitForDemandHandler)
        def transfer(headerData: ByteString, outStream: Source[ByteString, Any]): Unit = {
          val sinkIn = new SubSinkInlet[ByteString]("RenderingSink")
          transferSink = Some(sinkIn)

          sinkIn.setHandler(new InHandler {
            override def onPush(): Unit = push(out, ResponseRenderingOutput.HttpData(sinkIn.grab()))
            override def onUpstreamFinish(): Unit =
              if (close) completeStage()
              else stopTransfer()
          })

          var headersSent = false
          def sendHeaders(): Unit = {
            push(out, ResponseRenderingOutput.HttpData(headerData))
            headersSent = true
          }
          setHandler(out,
            new OutHandler {
              override def onPull(): Unit =
                if (!headersSent) sendHeaders()
                else sinkIn.pull()
              override def onDownstreamFinish(cause: Throwable): Unit = {
                completeStage()
                stopTransfer()
              }
            })

          try {
            outStream.runWith(sinkIn.sink)(interpreter.subFusingMaterializer)
            if (isAvailable(out)) sendHeaders()
          } catch {
            case NonFatal(e) =>
              stopTransfer()
              throw e
          }
        }

        def render(ctx: ResponseRenderingContext): StrictOrStreamed = {
          val r = new ByteArrayRendering(responseHeaderSizeHint, log.warning)

          import ctx.response._
          val noEntity = entity.isKnownEmpty || ctx.requestMethod == HttpMethods.HEAD

          def renderStatusLine(): Unit =
            protocol match {
              case `HTTP/1.1` => if (status eq StatusCodes.OK) r ~~ DefaultStatusLineBytes
                else r ~~ StatusLineStartBytes ~~ status ~~ CrLf
              case `HTTP/1.0` => r ~~ protocol ~~ ' ' ~~ status ~~ CrLf
              case other      => throw new IllegalStateException(s"Unexpected protocol '$other'")
            }

          def render(h: HttpHeader) = r ~~ h

          def mustRenderTransferEncodingChunkedHeader =
            entity.isChunked && (!entity.isKnownEmpty || ctx.requestMethod == HttpMethods.HEAD) && (ctx.requestProtocol == `HTTP/1.1`)

          def renderHeaders(headers: immutable.Seq[HttpHeader], alwaysClose: Boolean = false): Unit = {
            var connHeader: Connection = null
            var serverSeen: Boolean = false
            var transferEncodingSeen: Boolean = false
            var dateSeen: Boolean = false

            val it = headers.iterator
            while (it.hasNext)
              it.next() match {
                case x: Server =>
                  render(x)
                  serverSeen = true

                case x: Date =>
                  render(x)
                  dateSeen = true

                case x: `Content-Length` =>
                  suppressionWarning(log, x,
                    "explicit `Content-Length` header is not allowed. Use the appropriate HttpEntity subtype.")

                case x: `Content-Type` =>
                  suppressionWarning(log, x,
                    "explicit `Content-Type` header is not allowed. Set `HttpResponse.entity.contentType` instead.")

                case x: `Transfer-Encoding` =>
                  x.withChunkedPeeled match {
                    case None =>
                      suppressionWarning(log, x)
                    case Some(te) =>
                      // if the user applied some custom transfer-encoding we need to keep the header
                      render(if (mustRenderTransferEncodingChunkedHeader) te.withChunked else te)
                      transferEncodingSeen = true
                  }

                case x: Connection =>
                  connHeader = if (connHeader eq null) x else Connection(x.tokens ++ connHeader.tokens)

                case x: CustomHeader =>
                  if (x.renderInResponses) render(x)

                case x: RawHeader
                    if (x.is("content-type")) || (x.is("content-length")) || (x.is("transfer-encoding")) ||
                    (x.is("date")) || (x.is("server")) || (x.is("connection")) =>
                  suppressionWarning(log, x, "illegal RawHeader")

                case x =>
                  if (x.renderInResponses) render(x)
                  else log.warning("HTTP header '{}' is not allowed in responses", x)
              }

            if (!serverSeen) renderDefaultServerHeader(r)
            if (!dateSeen) r ~~ dateHeaderRendering.renderHeaderBytes()

            // Do we close the connection after this response?
            closeIf {
              // if we are prohibited to keep-alive by the spec
              alwaysClose ||
              // if the controller asked for closing (error, early response, etc. overrides anything
              ctx.closeRequested.wasForced ||
              // if the client wants to close and the response doesn't override
              (ctx.closeRequested.shouldClose && ((connHeader eq null) || !connHeader.hasKeepAlive)) ||
              // if the application wants to close explicitly
              (protocol match {
                case `HTTP/1.1` => (connHeader ne null) && connHeader.hasClose
                case `HTTP/1.0` =>
                  if (connHeader eq null) ctx.requestProtocol == `HTTP/1.1` else !connHeader.hasKeepAlive
                case other => throw new IllegalStateException(s"Unexpected protocol '$other'")
              })
            }

            // Do we render an explicit Connection header?
            val renderConnectionHeader =
              protocol == `HTTP/1.0` && !close || protocol == `HTTP/1.1` && close || // if we don't follow the default behavior
              close != ctx.closeRequested.shouldClose || // if we override the client's closing request
              protocol != ctx.requestProtocol // if we reply with a mismatching protocol (let's be very explicit in this case)

            if (renderConnectionHeader)
              r ~~ Connection ~~ (if (close) CloseBytes else KeepAliveBytes) ~~ CrLf
            else if (connHeader != null && connHeader.hasUpgrade) {
              r ~~ connHeader
              HttpHeader.fastFind(classOf[UpgradeToOtherProtocolResponseHeader], headers) match {
                case OptionVal.Some(header) => closeMode = SwitchToOtherProtocol(header.handler)
                case _                      => // nothing to do here...
              }
            }
            if (mustRenderTransferEncodingChunkedHeader && !transferEncodingSeen)
              r ~~ `Transfer-Encoding` ~~ ChunkedBytes ~~ CrLf
          }

          def renderContentLengthHeader(contentLength: Long) =
            if (status.allowsEntity) r ~~ ContentLengthBytes ~~ contentLength ~~ CrLf else r

          def headersAndEntity(entityBytes: => Source[ByteString, Any]): StrictOrStreamed =
            if (noEntity) {
              entityBytes.runWith(Sink.cancelled)(subFusingMaterializer)
              Strict(ResponseRenderingOutput.HttpData(r.asByteString))
            } else {
              HeadersAndStreamedEntity(
                r.asByteString,
                entityBytes)
            }

          @tailrec def completeResponseRendering(entity: ResponseEntity): StrictOrStreamed =
            entity match {
              case HttpEntity.Strict(_, data) =>
                renderHeaders(headers)
                renderEntityContentType(r, entity)
                renderContentLengthHeader(data.length) ~~ CrLf

                val finalBytes = {
                  if (!noEntity)
                    if (data.size < r.remainingCapacity) (r ~~ data).asByteString
                    else r.asByteString ++ data
                  else
                    r.asByteString
                }

                Strict {
                  closeMode match {
                    case SwitchToOtherProtocol(handler) =>
                      ResponseRenderingOutput.SwitchToOtherProtocol(finalBytes, handler)
                    case _ => ResponseRenderingOutput.HttpData(finalBytes)
                  }
                }

              case HttpEntity.Default(_, contentLength, data) =>
                renderHeaders(headers)
                renderEntityContentType(r, entity)
                renderContentLengthHeader(contentLength) ~~ CrLf
                headersAndEntity(data.via(CheckContentLengthTransformer.flow(contentLength)))

              case HttpEntity.CloseDelimited(_, data) =>
                renderHeaders(headers, alwaysClose = ctx.requestMethod != HttpMethods.HEAD)
                renderEntityContentType(r, entity) ~~ CrLf
                headersAndEntity(data)

              case HttpEntity.Chunked(contentType, chunks) =>
                if (ctx.requestProtocol == `HTTP/1.0`)
                  completeResponseRendering(HttpEntity.CloseDelimited(contentType, chunks.map(_.data)))
                else {
                  renderHeaders(headers)
                  renderEntityContentType(r, entity) ~~ CrLf
                  headersAndEntity(chunks.via(ChunkTransformer.flow))
                }
            }

          renderStatusLine()
          completeResponseRendering(entity)
        }
      }

    sealed trait StrictOrStreamed
    case class Strict(bytes: ResponseRenderingOutput) extends StrictOrStreamed
    case class HeadersAndStreamedEntity(headerBytes: ByteString, remainingData: Source[ByteString, Any])
        extends StrictOrStreamed
  }