override def createLogic()

in http-core/src/main/scala/org/apache/pekko/http/impl/engine/client/HttpsProxyGraphStage.scala [81:206]


  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with StageLogging {
      private var state: State = Starting

      lazy val parser = {
        val p = new HttpResponseParser(settings.parserSettings, HttpHeaderParser(settings.parserSettings, log)) {
          override def handleInformationalResponses = false

          override protected def parseMessage(input: ByteString, offset: Int): StateResult = {
            // hacky, we want in the first branch *all fragments* of the first response
            if (offset == 0) {
              super.parseMessage(input, offset)
            } else {
              if (input.size > offset) {
                emit(RemainingBytes(input.drop(offset)))
              } else {
                emit(NeedMoreData)
              }
              terminate()
            }
          }
        }
        p.setContextForNextResponse(HttpResponseParser.ResponseContext(HttpMethods.CONNECT, None))
        p
      }

      setHandler(sslIn,
        new InHandler {
          override def onPush() = {
            state match {
              case Starting =>
                throw new IllegalStateException("inlet OutgoingSSL.in unexpectedly pushed in Starting state")
              case Connecting =>
                throw new IllegalStateException("inlet OutgoingSSL.in unexpectedly pushed in Connecting state")
              case Connected =>
                push(bytesOut, grab(sslIn))
            }
          }

          override def onUpstreamFinish(): Unit = {
            complete(bytesOut)
          }

        })

      setHandler(bytesIn,
        new InHandler {
          override def onPush() = {
            state match {
              case Starting =>
              // that means that proxy had sent us something even before CONNECT to proxy was sent, therefore we just ignore it
              case Connecting =>
                val proxyResponse = grab(bytesIn)
                parser.parseBytes(proxyResponse) match {
                  case NeedMoreData =>
                    pull(bytesIn)
                  case ResponseStart(_: StatusCodes.Success, _, _, _, _, _) =>
                    var pushed = false
                    val parseResult = parser.onPull()
                    require(parseResult == ParserOutput.MessageEnd,
                      s"parseResult should be MessageEnd but was $parseResult")
                    parser.onPull() match {
                      // NeedMoreData is what we emit in overridden `parseMessage` in case input.size == offset
                      case NeedMoreData =>
                      case RemainingBytes(bytes) =>
                        push(sslOut, bytes) // parser already read more than expected, forward that data directly
                        pushed = true
                      case other =>
                        throw new IllegalStateException(s"unexpected element of type ${other.getClass}")
                    }
                    parser.onUpstreamFinish()

                    log.debug(s"HTTP(S) proxy connection to {}:{} established. Now forwarding data.", targetHostName,
                      targetPort)

                    state = Connected
                    if (isAvailable(bytesOut)) pull(sslIn)
                    if (isAvailable(sslOut)) pull(bytesIn)
                  case ResponseStart(statusCode, _, _, _, _, _) =>
                    failStage(new ProxyConnectionFailedException(
                      s"The HTTP(S) proxy rejected to open a connection to $targetHostName:$targetPort with status code: $statusCode"))
                  case other =>
                    throw new IllegalStateException(s"unexpected element of type $other")
                }

              case Connected =>
                push(sslOut, grab(bytesIn))
            }
          }

          override def onUpstreamFinish(): Unit = complete(sslOut)

        })

      setHandler(bytesOut,
        new OutHandler {
          override def onPull() = {
            state match {
              case Starting =>
                log.debug(
                  s"TCP connection to HTTP(S) proxy connection established. Sending CONNECT {}:{} to HTTP(S) proxy",
                  targetHostName, targetPort)
                push(bytesOut, connectMsg)
                state = Connecting
              case Connecting =>
              // don't need to do anything
              case Connected =>
                pull(sslIn)
            }
          }

          override def onDownstreamFinish(): Unit = cancel(sslIn)

        })

      setHandler(sslOut,
        new OutHandler {
          override def onPull() = {
            pull(bytesIn)
          }

          override def onDownstreamFinish(): Unit = cancel(bytesIn)

        })

    }