def websocketUpgrade()

in http-core/src/main/scala/org/apache/pekko/http/impl/engine/ws/Handshake.scala [87:149]


    def websocketUpgrade(headers: List[HttpHeader], hostHeaderPresent: Boolean, settings: WebSocketSettings,
        log: LoggingAdapter): OptionVal[UpgradeToWebSocketLowLevel] = {

      // notes on Headers that re REQUIRE to be present here:
      // - Host header is validated in general HTTP logic
      // - Origin header is optional and, if required, should be validated
      //   on higher levels (routing, application logic)
      //
      // TODO See #18709 Extension support is optional in WS and currently unsupported.
      //
      // these are not needed directly, we verify their presence and correctness only:
      // - Upgrade
      // - Connection
      // - `Sec-WebSocket-Version`
      def hasAllRequiredWebsocketUpgradeHeaders: Boolean = {
        // single-pass through the headers list while collecting all needed requirements
        // this way we avoid scanning the requirements list 3 times (as we would with collect/find)
        val it = headers.iterator
        var requirementsMet = 0
        val targetRequirements = 3
        while (it.hasNext && (requirementsMet != targetRequirements)) it.next() match {
          case u: Upgrade                 => if (u.hasWebSocket) requirementsMet += 1
          case c: Connection              => if (c.hasUpgrade) requirementsMet += 1
          case v: `Sec-WebSocket-Version` => if (v.hasVersion(CurrentWebSocketVersion)) requirementsMet += 1
          case _                          => // continue...
        }
        requirementsMet == targetRequirements
      }

      if (hasAllRequiredWebsocketUpgradeHeaders) {
        val key = HttpHeader.fastFind(classOf[`Sec-WebSocket-Key`], headers)
        if (key.isDefined && key.get.isValid) {
          val protocol = HttpHeader.fastFind(classOf[`Sec-WebSocket-Protocol`], headers)

          val clientSupportedSubprotocols = protocol match {
            case OptionVal.Some(p) => p.protocols
            case _                 => Nil
          }

          val header = new UpgradeToWebSocketLowLevel {
            def requestedProtocols: Seq[String] = clientSupportedSubprotocols

            def handle(
                handler: Either[Graph[FlowShape[FrameEvent, FrameEvent], Any], Graph[FlowShape[Message, Message], Any]],
                subprotocol: Option[String]): HttpResponse = {
              require(
                subprotocol.forall(chosen => clientSupportedSubprotocols.contains(chosen)),
                s"Tried to choose invalid subprotocol '$subprotocol' which wasn't offered by the client: [${requestedProtocols.mkString(", ")}]")
              buildResponse(key.get, handler, subprotocol, settings, log)
            }

            def handleFrames(
                handlerFlow: Graph[FlowShape[FrameEvent, FrameEvent], Any], subprotocol: Option[String]): HttpResponse =
              handle(Left(handlerFlow), subprotocol)

            override def handleMessages(handlerFlow: Graph[FlowShape[Message, Message], Any],
                subprotocol: Option[String] = None): HttpResponse =
              handle(Right(handlerFlow), subprotocol)
          }
          OptionVal.Some(header)
        } else OptionVal.None
      } else OptionVal.None
    }