in http-core/src/main/scala/org/apache/pekko/http/impl/engine/client/HttpsProxyGraphStage.scala [81:206]
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with StageLogging {
private var state: State = Starting
lazy val parser = {
val p = new HttpResponseParser(settings.parserSettings, HttpHeaderParser(settings.parserSettings, log)) {
override def handleInformationalResponses = false
override protected def parseMessage(input: ByteString, offset: Int): StateResult = {
// hacky, we want in the first branch *all fragments* of the first response
if (offset == 0) {
super.parseMessage(input, offset)
} else {
if (input.size > offset) {
emit(RemainingBytes(input.drop(offset)))
} else {
emit(NeedMoreData)
}
terminate()
}
}
}
p.setContextForNextResponse(HttpResponseParser.ResponseContext(HttpMethods.CONNECT, None))
p
}
setHandler(sslIn,
new InHandler {
override def onPush() = {
state match {
case Starting =>
throw new IllegalStateException("inlet OutgoingSSL.in unexpectedly pushed in Starting state")
case Connecting =>
throw new IllegalStateException("inlet OutgoingSSL.in unexpectedly pushed in Connecting state")
case Connected =>
push(bytesOut, grab(sslIn))
}
}
override def onUpstreamFinish(): Unit = {
complete(bytesOut)
}
})
setHandler(bytesIn,
new InHandler {
override def onPush() = {
state match {
case Starting =>
// that means that proxy had sent us something even before CONNECT to proxy was sent, therefore we just ignore it
case Connecting =>
val proxyResponse = grab(bytesIn)
parser.parseBytes(proxyResponse) match {
case NeedMoreData =>
pull(bytesIn)
case ResponseStart(_: StatusCodes.Success, _, _, _, _, _) =>
var pushed = false
val parseResult = parser.onPull()
require(parseResult == ParserOutput.MessageEnd,
s"parseResult should be MessageEnd but was $parseResult")
parser.onPull() match {
// NeedMoreData is what we emit in overridden `parseMessage` in case input.size == offset
case NeedMoreData =>
case RemainingBytes(bytes) =>
push(sslOut, bytes) // parser already read more than expected, forward that data directly
pushed = true
case other =>
throw new IllegalStateException(s"unexpected element of type ${other.getClass}")
}
parser.onUpstreamFinish()
log.debug(s"HTTP(S) proxy connection to {}:{} established. Now forwarding data.", targetHostName,
targetPort)
state = Connected
if (isAvailable(bytesOut)) pull(sslIn)
if (isAvailable(sslOut)) pull(bytesIn)
case ResponseStart(statusCode, _, _, _, _, _) =>
failStage(new ProxyConnectionFailedException(
s"The HTTP(S) proxy rejected to open a connection to $targetHostName:$targetPort with status code: $statusCode"))
case other =>
throw new IllegalStateException(s"unexpected element of type $other")
}
case Connected =>
push(sslOut, grab(bytesIn))
}
}
override def onUpstreamFinish(): Unit = complete(sslOut)
})
setHandler(bytesOut,
new OutHandler {
override def onPull() = {
state match {
case Starting =>
log.debug(
s"TCP connection to HTTP(S) proxy connection established. Sending CONNECT {}:{} to HTTP(S) proxy",
targetHostName, targetPort)
push(bytesOut, connectMsg)
state = Connecting
case Connecting =>
// don't need to do anything
case Connected =>
pull(sslIn)
}
}
override def onDownstreamFinish(): Unit = cancel(sslIn)
})
setHandler(sslOut,
new OutHandler {
override def onPull() = {
pull(bytesIn)
}
override def onDownstreamFinish(): Unit = cancel(bytesIn)
})
}