in http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala [91:252]
def websocketSupport(settings: ServerSettings, log: LoggingAdapter)
: BidiFlow[ResponseRenderingOutput, ByteString, SessionBytes, SessionBytes, NotUsed] =
BidiFlow.fromGraph(new ProtocolSwitchStage(settings, log))
def parsingRendering(settings: ServerSettings, log: LoggingAdapter, isSecureConnection: Boolean,
dateHeaderRendering: DateHeaderRendering)
: BidiFlow[ResponseRenderingContext, ResponseRenderingOutput, SessionBytes, RequestOutput, NotUsed] =
BidiFlow.fromFlows(rendering(settings, log, dateHeaderRendering), parsing(settings, log, isSecureConnection))
def controller(settings: ServerSettings, log: LoggingAdapter)
: BidiFlow[HttpResponse, ResponseRenderingContext, RequestOutput, RequestOutput, NotUsed] =
BidiFlow.fromGraph(new ControllerStage(settings, log)).reversed
def requestPreparation(
settings: ServerSettings): BidiFlow[HttpResponse, HttpResponse, RequestOutput, HttpRequest, NotUsed] =
BidiFlow.fromFlows(Flow[HttpResponse], new PrepareRequests(settings))
def requestTimeoutSupport(
timeout: Duration, log: LoggingAdapter): BidiFlow[HttpResponse, HttpResponse, HttpRequest, HttpRequest, NotUsed] =
if (timeout == Duration.Zero) BidiFlow.identity[HttpResponse, HttpRequest]
else BidiFlow.fromGraph(new RequestTimeoutSupport(timeout, log)).reversed
/**
* Two state stage, either transforms an incoming RequestOutput into a HttpRequest with strict entity and then pushes
* that (the "idle" inHandler) or creates a HttpRequest with a streamed entity and switch to a state which will push
* incoming chunks into the streaming entity until end of request is reached (the StreamedEntityCreator case in create
* entity).
*/
final class PrepareRequests(settings: ServerSettings) extends GraphStage[FlowShape[RequestOutput, HttpRequest]] {
val in = Inlet[RequestOutput]("PrepareRequests.in")
val out = Outlet[HttpRequest]("PrepareRequests.out")
override val shape: FlowShape[RequestOutput, HttpRequest] = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes) =
new GraphStageLogic(shape) with InHandler with OutHandler {
val remoteAddressOpt = inheritedAttributes.get[HttpAttributes.RemoteAddress].map(_.address)
var downstreamPullWaiting = false
var completionDeferred = false
var entitySource: SubSourceOutlet[RequestOutput] = _
// optimization: to avoid allocations the "idle" case in and out handlers are put directly on the GraphStageLogic itself
override def onPull(): Unit = {
pull(in)
}
// optimization: this callback is used to handle entity substream cancellation to avoid allocating a dedicated handler
override def onDownstreamFinish(cause: Throwable): Unit = {
if (entitySource ne null) {
// application layer has cancelled or only partially consumed response entity:
// connection will be closed
entitySource.complete()
}
completeStage()
}
override def onUpstreamFinish(): Unit = super.onUpstreamFinish()
override def onUpstreamFailure(ex: Throwable): Unit = {
if (entitySource ne null) {
// application layer has cancelled or only partially consumed response entity:
// connection will be closed
entitySource.fail(ex)
}
super.onUpstreamFailure(ex)
}
override def onPush(): Unit = grab(in) match {
case RequestStart(method, uri, protocol, attrs, hdrs, entityCreator, _, _) =>
val effectiveMethod = if (method == HttpMethods.HEAD && settings.transparentHeadRequests) HttpMethods.GET
else method
@nowarn("msg=use remote-address-attribute instead")
val effectiveHeaders =
if (settings.remoteAddressHeader && remoteAddressOpt.isDefined)
headers.`Remote-Address`(RemoteAddress(remoteAddressOpt.get)) +: hdrs
else hdrs
val entity = createEntity(entityCreator).withSizeLimit(settings.parserSettings.maxContentLength)
val httpRequest = HttpRequest(effectiveMethod, uri, effectiveHeaders, entity, protocol)
.withAttributes(attrs)
val effectiveHttpRequest = if (settings.remoteAddressAttribute) {
remoteAddressOpt.fold(httpRequest) { remoteAddress =>
httpRequest.addAttribute(AttributeKeys.remoteAddress, RemoteAddress(remoteAddress))
}
} else httpRequest
push(out, effectiveHttpRequest)
case other =>
throw new IllegalStateException(s"unexpected element of type ${other.getClass}")
}
setIdleHandlers()
def setIdleHandlers(): Unit = {
if (completionDeferred) {
completeStage()
} else {
setHandler(in, this)
setHandler(out, this)
if (downstreamPullWaiting) {
downstreamPullWaiting = false
pull(in)
}
}
}
def createEntity(creator: EntityCreator[RequestOutput, RequestEntity]): RequestEntity =
creator match {
case StrictEntityCreator(entity) => entity
case StreamedEntityCreator(creator) => streamRequestEntity(creator)
}
def streamRequestEntity(
creator: (Source[ParserOutput.RequestOutput, NotUsed]) => RequestEntity): RequestEntity = {
// stream incoming chunks into the request entity until we reach the end of it
// and then toggle back to "idle"
entitySource = new SubSourceOutlet[RequestOutput]("EntitySource")
// optimization: re-use the idle outHandler
entitySource.setHandler(this)
// optimization: handlers are combined to reduce allocations
val chunkedRequestHandler = new InHandler with OutHandler {
def onPush(): Unit = {
grab(in) match {
case MessageEnd =>
entitySource.complete()
entitySource = null
setIdleHandlers()
case x => entitySource.push(x)
}
}
override def onUpstreamFinish(): Unit = {
entitySource.complete()
completeStage()
}
override def onUpstreamFailure(ex: Throwable): Unit = {
entitySource.fail(ex)
failStage(ex)
}
override def onPull(): Unit = {
// remember this until we are done with the chunked entity
// so can pull downstream then
downstreamPullWaiting = true
}
override def onDownstreamFinish(cause: Throwable): Unit = {
// downstream signalled not wanting any more requests
// we should keep processing the entity stream and then
// when it completes complete the stage
completionDeferred = true
}
}
setHandler(in, chunkedRequestHandler)
setHandler(out, chunkedRequestHandler)
creator(Source.fromGraph(entitySource.source))
}
}
}