in http-core/src/main/scala/org/apache/pekko/http/impl/engine/client/OutgoingConnectionBlueprint.scala [186:288]
override def createLogic(effectiveAttributes: Attributes) =
new GraphStageLogic(shape) with InHandler with OutHandler {
private var entitySource: SubSourceOutlet[ResponseOutput] = _
private def entitySubstreamStarted = entitySource ne null
private def idle = this
private var completionDeferred = false
private var completeOnMessageEnd = false
def setIdleHandlers(): Unit =
if (completeOnMessageEnd || completionDeferred) completeStage()
else setHandlers(responseOutputIn, httpResponseOut, idle)
def onPush(): Unit = grab(responseOutputIn) match {
case ResponseStart(statusCode, protocol, attributes, headers, entityCreator, closeRequested) =>
val entity = createEntity(entityCreator).withSizeLimit(parserSettings.maxContentLength)
push(httpResponseOut, new HttpResponse(statusCode, headers, attributes, entity, protocol))
completeOnMessageEnd = closeRequested
case MessageStartError(_, info) =>
throw IllegalResponseException(info)
case other =>
throw new IllegalStateException(s"ResponseStart expected but $other received.")
}
def onPull(): Unit = {
if (!entitySubstreamStarted) pull(responseOutputIn)
}
override def onDownstreamFinish(): Unit = {
// if downstream cancels while streaming entity,
// make sure we also cancel the entity source, but
// after being done with streaming the entity
if (entitySubstreamStarted) {
completionDeferred = true
} else {
completeStage()
}
}
setIdleHandlers()
// with a strict message there still is a MessageEnd to wait for
lazy val waitForMessageEnd = new InHandler with OutHandler {
def onPush(): Unit = grab(responseOutputIn) match {
case MessageEnd =>
if (isAvailable(httpResponseOut)) pull(responseOutputIn)
setIdleHandlers()
case other => throw new IllegalStateException(s"MessageEnd expected but $other received.")
}
override def onPull(): Unit = {
// ignore pull as we will anyways pull when we get MessageEnd
}
}
// with a streamed entity we push the chunks into the substream
// until we reach MessageEnd
private lazy val substreamHandler = new InHandler with OutHandler {
override def onPush(): Unit = grab(responseOutputIn) match {
case MessageEnd =>
entitySource.complete()
entitySource = null
// there was a deferred pull from upstream
// while we were streaming the entity
if (isAvailable(httpResponseOut)) pull(responseOutputIn)
setIdleHandlers()
case messagePart =>
entitySource.push(messagePart)
}
override def onPull(): Unit = pull(responseOutputIn)
override def onUpstreamFinish(): Unit = {
entitySource.complete()
completeStage()
}
override def onUpstreamFailure(reason: Throwable): Unit = {
entitySource.fail(reason)
failStage(reason)
}
}
private def createEntity(creator: EntityCreator[ResponseOutput, ResponseEntity]): ResponseEntity = {
creator match {
case StrictEntityCreator(entity) =>
// upstream demanded one element, which it just got
// but we want MessageEnd as well
pull(responseOutputIn)
setHandler(responseOutputIn, waitForMessageEnd)
setHandler(httpResponseOut, waitForMessageEnd)
entity
case StreamedEntityCreator(creator) =>
entitySource = new SubSourceOutlet[ResponseOutput]("EntitySource")
entitySource.setHandler(substreamHandler)
setHandler(responseOutputIn, substreamHandler)
creator(Source.fromGraph(entitySource.source))
}
}
}