def websocketSupport()

in http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala [91:252]


  def websocketSupport(settings: ServerSettings, log: LoggingAdapter)
      : BidiFlow[ResponseRenderingOutput, ByteString, SessionBytes, SessionBytes, NotUsed] =
    BidiFlow.fromGraph(new ProtocolSwitchStage(settings, log))

  def parsingRendering(settings: ServerSettings, log: LoggingAdapter, isSecureConnection: Boolean,
      dateHeaderRendering: DateHeaderRendering)
      : BidiFlow[ResponseRenderingContext, ResponseRenderingOutput, SessionBytes, RequestOutput, NotUsed] =
    BidiFlow.fromFlows(rendering(settings, log, dateHeaderRendering), parsing(settings, log, isSecureConnection))

  def controller(settings: ServerSettings, log: LoggingAdapter)
      : BidiFlow[HttpResponse, ResponseRenderingContext, RequestOutput, RequestOutput, NotUsed] =
    BidiFlow.fromGraph(new ControllerStage(settings, log)).reversed

  def requestPreparation(
      settings: ServerSettings): BidiFlow[HttpResponse, HttpResponse, RequestOutput, HttpRequest, NotUsed] =
    BidiFlow.fromFlows(Flow[HttpResponse], new PrepareRequests(settings))

  def requestTimeoutSupport(
      timeout: Duration, log: LoggingAdapter): BidiFlow[HttpResponse, HttpResponse, HttpRequest, HttpRequest, NotUsed] =
    if (timeout == Duration.Zero) BidiFlow.identity[HttpResponse, HttpRequest]
    else BidiFlow.fromGraph(new RequestTimeoutSupport(timeout, log)).reversed

  /**
   * Two state stage, either transforms an incoming RequestOutput into a HttpRequest with strict entity and then pushes
   * that (the "idle" inHandler) or creates a HttpRequest with a streamed entity and switch to a state which will push
   * incoming chunks into the streaming entity until end of request is reached (the StreamedEntityCreator case in create
   * entity).
   */
  final class PrepareRequests(settings: ServerSettings) extends GraphStage[FlowShape[RequestOutput, HttpRequest]] {
    val in = Inlet[RequestOutput]("PrepareRequests.in")
    val out = Outlet[HttpRequest]("PrepareRequests.out")
    override val shape: FlowShape[RequestOutput, HttpRequest] = FlowShape.of(in, out)

    override def createLogic(inheritedAttributes: Attributes) =
      new GraphStageLogic(shape) with InHandler with OutHandler {
        val remoteAddressOpt = inheritedAttributes.get[HttpAttributes.RemoteAddress].map(_.address)

        var downstreamPullWaiting = false
        var completionDeferred = false
        var entitySource: SubSourceOutlet[RequestOutput] = _

        // optimization: to avoid allocations the "idle" case in and out handlers are put directly on the GraphStageLogic itself
        override def onPull(): Unit = {
          pull(in)
        }

        // optimization: this callback is used to handle entity substream cancellation to avoid allocating a dedicated handler
        override def onDownstreamFinish(): Unit = {
          if (entitySource ne null) {
            // application layer has cancelled or only partially consumed response entity:
            // connection will be closed
            entitySource.complete()
          }
          completeStage()
        }

        override def onUpstreamFinish(): Unit = super.onUpstreamFinish()
        override def onUpstreamFailure(ex: Throwable): Unit = {
          if (entitySource ne null) {
            // application layer has cancelled or only partially consumed response entity:
            // connection will be closed
            entitySource.fail(ex)
          }
          super.onUpstreamFailure(ex)
        }

        override def onPush(): Unit = grab(in) match {
          case RequestStart(method, uri, protocol, attrs, hdrs, entityCreator, _, _) =>
            val effectiveMethod = if (method == HttpMethods.HEAD && settings.transparentHeadRequests) HttpMethods.GET
            else method

            @nowarn("msg=use remote-address-attribute instead")
            val effectiveHeaders =
              if (settings.remoteAddressHeader && remoteAddressOpt.isDefined)
                headers.`Remote-Address`(RemoteAddress(remoteAddressOpt.get)) +: hdrs
              else hdrs

            val entity = createEntity(entityCreator).withSizeLimit(settings.parserSettings.maxContentLength)
            val httpRequest = HttpRequest(effectiveMethod, uri, effectiveHeaders, entity, protocol)
              .withAttributes(attrs)

            val effectiveHttpRequest = if (settings.remoteAddressAttribute) {
              remoteAddressOpt.fold(httpRequest) { remoteAddress =>
                httpRequest.addAttribute(AttributeKeys.remoteAddress, RemoteAddress(remoteAddress))
              }
            } else httpRequest

            push(out, effectiveHttpRequest)
          case other =>
            throw new IllegalStateException(s"unexpected element of type ${other.getClass}")
        }

        setIdleHandlers()

        def setIdleHandlers(): Unit = {
          if (completionDeferred) {
            completeStage()
          } else {
            setHandler(in, this)
            setHandler(out, this)
            if (downstreamPullWaiting) {
              downstreamPullWaiting = false
              pull(in)
            }
          }
        }

        def createEntity(creator: EntityCreator[RequestOutput, RequestEntity]): RequestEntity =
          creator match {
            case StrictEntityCreator(entity)    => entity
            case StreamedEntityCreator(creator) => streamRequestEntity(creator)
          }

        def streamRequestEntity(
            creator: (Source[ParserOutput.RequestOutput, NotUsed]) => RequestEntity): RequestEntity = {
          // stream incoming chunks into the request entity until we reach the end of it
          // and then toggle back to "idle"

          entitySource = new SubSourceOutlet[RequestOutput]("EntitySource")
          // optimization: re-use the idle outHandler
          entitySource.setHandler(this)

          // optimization: handlers are combined to reduce allocations
          val chunkedRequestHandler = new InHandler with OutHandler {
            def onPush(): Unit = {
              grab(in) match {
                case MessageEnd =>
                  entitySource.complete()
                  entitySource = null
                  setIdleHandlers()

                case x => entitySource.push(x)
              }
            }
            override def onUpstreamFinish(): Unit = {
              entitySource.complete()
              completeStage()
            }
            override def onUpstreamFailure(ex: Throwable): Unit = {
              entitySource.fail(ex)
              failStage(ex)
            }
            override def onPull(): Unit = {
              // remember this until we are done with the chunked entity
              // so can pull downstream then
              downstreamPullWaiting = true
            }
            override def onDownstreamFinish(): Unit = {
              // downstream signalled not wanting any more requests
              // we should keep processing the entity stream and then
              // when it completes complete the stage
              completionDeferred = true
            }
          }

          setHandler(in, chunkedRequestHandler)
          setHandler(out, chunkedRequestHandler)
          creator(Source.fromGraph(entitySource.source))
        }

      }
  }