in http-core/src/main/scala/org/apache/pekko/http/impl/engine/parsing/HttpResponseParser.scala [139:230]
protected final def parseEntity(headers: List[HttpHeader], protocol: HttpProtocol, input: ByteString, bodyStart: Int,
clh: Option[`Content-Length`], cth: Option[`Content-Type`], isChunked: Boolean,
expect100continue: Boolean, hostHeaderPresent: Boolean, closeAfterResponseCompletion: Boolean,
sslSession: SSLSession): StateResult = {
def emitResponseStart(
createEntity: EntityCreator[ResponseOutput, ResponseEntity],
headers: List[HttpHeader] = headers) = {
val attributes: Map[AttributeKey[_], Any] =
if (settings.includeSslSessionAttribute) Map(AttributeKeys.sslSession -> SslSessionInfo(sslSession))
else Map.empty
val close =
contextForCurrentResponse.get.oneHundredContinueTrigger match {
case None => closeAfterResponseCompletion
case Some(trigger) if statusCode.isSuccess =>
trigger.trySuccess(())
closeAfterResponseCompletion
case Some(trigger) =>
trigger.tryFailure(OneHundredContinueError)
true
}
emit(ResponseStart(statusCode, protocol, attributes, headers, createEntity, close))
}
def finishEmptyResponse() =
statusCode match {
case _: StatusCodes.Informational if handleInformationalResponses =>
if (statusCode == StatusCodes.Continue)
contextForCurrentResponse.get.oneHundredContinueTrigger.foreach(_.trySuccess(()))
// http://tools.ietf.org/html/rfc7231#section-6.2 says:
// "A client MUST be able to parse one or more 1xx responses received prior to a final response,
// even if the client does not expect one."
// so we simply drop this interim response and start parsing the next one
startNewMessage(input, bodyStart)
case _ =>
emitResponseStart(emptyEntity(cth))
setCompletionHandling(HttpMessageParser.CompletionOk)
emit(MessageEnd)
startNewMessage(input, bodyStart)
}
if (statusCode.allowsEntity) {
contextForCurrentResponse.get.requestMethod match {
case HttpMethods.HEAD => clh match {
case Some(`Content-Length`(contentLength)) if contentLength > 0 =>
emitResponseStart {
StrictEntityCreator(HttpEntity.Default(contentType(cth), contentLength, Source.empty))
}
setCompletionHandling(HttpMessageParser.CompletionOk)
emit(MessageEnd)
startNewMessage(input, bodyStart)
case _ => finishEmptyResponse()
}
case HttpMethods.CONNECT =>
finishEmptyResponse()
case _ =>
if (!isChunked) {
clh match {
case Some(`Content-Length`(contentLength)) =>
if (contentLength == 0) finishEmptyResponse()
else if (contentLength <= input.size - bodyStart) {
val cl = contentLength.toInt
emitResponseStart(strictEntity(cth, input, bodyStart, cl))
setCompletionHandling(HttpMessageParser.CompletionOk)
emit(MessageEnd)
startNewMessage(input, bodyStart + cl)
} else {
emitResponseStart(defaultEntity(cth, contentLength))
parseFixedLengthBody(contentLength, closeAfterResponseCompletion)(input, bodyStart)
}
case None =>
emitResponseStart {
StreamedEntityCreator { entityParts =>
val data = entityParts.collect { case EntityPart(bytes) => bytes }
HttpEntity.CloseDelimited(contentType(cth), data)
}
}
setCompletionHandling(HttpMessageParser.CompletionOk)
parseToCloseBody(input, bodyStart, totalBytesRead = 0)
}
} else {
if (clh.isEmpty) {
emitResponseStart(chunkedEntity(cth), headers)
parseChunk(input, bodyStart, closeAfterResponseCompletion, totalBytesRead = 0L)
} else failMessageStart("A chunked response must not contain a Content-Length header")
}
}
} else finishEmptyResponse()
}