in http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala [429:656]
override def initialAttributes = Attributes.name("ControllerStage")
val shape = new BidiShape(requestParsingIn, requestPrepOut, httpResponseIn, responseCtxOut)
override private[pekko] def createLogicAndMaterializedValue(inheritedAttributes: Attributes,
outerMaterializer: Materializer) =
new GraphStageLogic(shape) {
val parsingErrorHandler: ParsingErrorHandler =
settings.parsingErrorHandlerInstance(ActorMaterializerHelper.downcast(outerMaterializer).system)
val pullHttpResponseIn = () => tryPull(httpResponseIn)
var openRequests = immutable.Queue[RequestStart]()
var oneHundredContinueResponsePending = false
var pullSuppressed = false
var messageEndPending = false
setHandler(requestParsingIn,
new InHandler {
def onPush(): Unit =
grab(requestParsingIn) match {
case r: RequestStart =>
openRequests = openRequests.enqueue(r)
messageEndPending = r.createEntity.isInstanceOf[StreamedEntityCreator[_, _]]
val rs = if (r.expect100Continue) {
r.createEntity match {
case StrictEntityCreator(HttpEntity.Strict(_, _)) =>
// This covers two cases:
// - Either: The strict entity got all its data send already, so no need to wait for more data
// - Or: The strict entity contains no data (Content-Length header value was 0 or it did not exist), the client will not send any data
r
case _ =>
oneHundredContinueResponsePending = true
r.copy(createEntity = with100ContinueTrigger(r.createEntity))
}
} else r
push(requestPrepOut, rs)
case MessageEnd =>
messageEndPending = false
push(requestPrepOut, MessageEnd)
case MessageStartError(status, info) => finishWithIllegalRequestError(status, info)
case x: EntityStreamError if messageEndPending && openRequests.isEmpty =>
// client terminated the connection after receiving an early response to 100-continue
completeStage()
case x =>
push(requestPrepOut, x)
}
override def onUpstreamFinish() =
if (openRequests.isEmpty) completeStage()
else complete(requestPrepOut)
})
setHandler(requestPrepOut,
new OutHandler {
def onPull(): Unit =
if (oneHundredContinueResponsePending) pullSuppressed = true
else if (!hasBeenPulled(requestParsingIn)) pull(requestParsingIn)
override def onDownstreamFinish(): Unit =
if (openRequests.isEmpty) completeStage()
else failStage(
new IllegalStateException("User handler flow was cancelled with ongoing request") with NoStackTrace)
})
setHandler(httpResponseIn,
new InHandler {
def onPush(): Unit = {
val requestStart = openRequests.head
openRequests = openRequests.tail
val response0 = grab(httpResponseIn)
val response =
if (response0.entity.isStrict) response0 // response stream cannot fail
else response0.mapEntity { e =>
val (newEntity, fut) = HttpEntity.captureTermination(e)
fut.onComplete {
case Failure(ex) =>
log.error(ex,
s"Response stream for [${requestStart.debugString}] failed with '${ex.getMessage}'. Aborting connection.")
case _ => // ignore
}(ExecutionContexts.sameThreadExecutionContext)
newEntity
}
val isEarlyResponse = messageEndPending && openRequests.isEmpty
if (isEarlyResponse && response.status.isSuccess)
log.warning(
s"Sending an 2xx 'early' response before end of request for ${requestStart.uri} received... " +
"Note that the connection will be closed after this response. Also, many clients will not read early responses! " +
"Consider only issuing this response after the request data has been completely read!")
val forceClose =
(requestStart.expect100Continue && oneHundredContinueResponsePending) ||
(isClosed(requestParsingIn) && openRequests.isEmpty) ||
isEarlyResponse
val close =
if (forceClose) CloseRequested.ForceClose
else if (requestStart.closeRequested) CloseRequested.RequestAskedForClosing
else CloseRequested.Unspecified
emit(responseCtxOut,
ResponseRenderingContext(response, requestStart.method, requestStart.protocol, close),
pullHttpResponseIn)
if (!isClosed(requestParsingIn) && close.shouldClose && requestStart.expect100Continue)
maybePullRequestParsingIn()
}
override def onUpstreamFinish() =
if (openRequests.isEmpty && isClosed(requestParsingIn)) completeStage()
else complete(responseCtxOut)
override def onUpstreamFailure(ex: Throwable): Unit =
ex match {
case EntityStreamException(errorInfo) =>
// the application has forwarded a request entity stream error to the response stream
finishWithIllegalRequestError(StatusCodes.BadRequest, errorInfo)
case EntityStreamSizeException(limit, contentLength) =>
val summary = contentLength match {
case Some(cl) => s"Request Content-Length of $cl bytes exceeds the configured limit of $limit bytes"
case None =>
s"Aggregated data length of request entity exceeds the configured limit of $limit bytes"
}
val info =
ErrorInfo(summary, "Consider increasing the value of pekko.http.server.parsing.max-content-length")
finishWithIllegalRequestError(StatusCodes.PayloadTooLarge, info)
case IllegalUriException(errorInfo) =>
finishWithIllegalRequestError(StatusCodes.BadRequest, errorInfo)
case ex: ServerTerminationDeadlineReached => failStage(ex)
case NonFatal(e) =>
log.error(e, "Internal server error, sending 500 response")
emitErrorResponse(HttpResponse(StatusCodes.InternalServerError))
}
})
setHandler(responseCtxOut,
new OutHandler {
override def onPull() = {
pull(httpResponseIn)
// after the initial pull here we only ever pull after having emitted in `onPush` of `httpResponseIn`
setHandler(responseCtxOut, GraphStageLogic.EagerTerminateOutput)
}
})
def finishWithIllegalRequestError(status: StatusCode, info: ErrorInfo): Unit = {
val errorResponse = JavaMapping.toScala(parsingErrorHandler.handle(status, info, log, settings))
emitErrorResponse(errorResponse)
}
def emitErrorResponse(response: HttpResponse): Unit =
emit(responseCtxOut, ResponseRenderingContext(response, closeRequested = CloseRequested.ForceClose),
() => completeStage())
def maybePullRequestParsingIn(): Unit =
if (pullSuppressed) {
pullSuppressed = false
pull(requestParsingIn)
}
/**
* The `Expect: 100-continue` header has a special status in HTTP.
* It allows the client to send an `Expect: 100-continue` header with the request and then pause request sending
* (i.e. hold back sending the request entity). The server reads the request headers, determines whether it wants to
* accept the request and responds with
*
* - `417 Expectation Failed`, if it doesn't support the `100-continue` expectation
* (or if the `Expect` header contains other, unsupported expectations).
* - a `100 Continue` response,
* if it is ready to accept the request entity and the client should go ahead with sending it
* - a final response (like a 4xx to signal some client-side error
* (e.g. if the request entity length is beyond the configured limit) or a 3xx redirect)
*
* Only if the client receives a `100 Continue` response from the server is it allowed to continue sending the request
* entity. In this case it will receive another response after having completed request sending.
* So this special feature breaks the normal "one request - one response" logic of HTTP!
* It therefore requires special handling in all HTTP stacks (client- and server-side).
*
* For us this means:
*
* - on the server-side:
* After having read a `Expect: 100-continue` header with the request we package up an `HttpRequest` instance and send
* it through to the application. Only when (and if) the application then requests data from the entity stream do we
* send out a `100 Continue` response and continue reading the request entity.
* The application can therefore determine itself whether it wants the client to send the request entity
* by deciding whether to look at the request entity data stream or not.
* If the application sends a response *without* having looked at the request entity the client receives this
* response *instead of* the `100 Continue` response and the server closes the connection afterwards.
*
* - on the client-side:
* If the user adds a `Expect: 100-continue` header to the request we need to hold back sending the entity until
* we've received a `100 Continue` response.
*/
val emit100ContinueResponse =
getAsyncCallback[Unit] { _ =>
oneHundredContinueResponsePending = false
emit(responseCtxOut, ResponseRenderingContext(HttpResponse(StatusCodes.Continue)))
maybePullRequestParsingIn()
}
case object OneHundredContinueStage extends GraphStage[FlowShape[ParserOutput, ParserOutput]] {
val in: Inlet[ParserOutput] = Inlet("OneHundredContinueStage.in")
val out: Outlet[ParserOutput] = Outlet("OneHundredContinueStage.out")
override val shape: FlowShape[ParserOutput, ParserOutput] = FlowShape(in, out)
override def initialAttributes = Attributes.name("expect100continueTrigger")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
private var oneHundredContinueSent = false
override def onPush(): Unit = push(out, grab(in))
override def onPull(): Unit = {
if (!oneHundredContinueSent) {
oneHundredContinueSent = true
emit100ContinueResponse.invoke(())
}
pull(in)
}
setHandlers(in, out, this)
}
}
def with100ContinueTrigger[T <: ParserOutput](createEntity: EntityCreator[T, RequestEntity]) =
StreamedEntityCreator {
createEntity.compose[Source[T, NotUsed]] {
_.via(OneHundredContinueStage.asInstanceOf[GraphStage[FlowShape[T, T]]])
}
}
} -> NotUsed