in http-core/src/main/scala/org/apache/pekko/http/impl/engine/parsing/HttpRequestParser.scala [56:258]
override protected def initialAttributes: Attributes = Attributes.name("HttpRequestParser")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with HttpMessageParser[RequestOutput] with InHandler with OutHandler {
import HttpMessageParser._
override val settings = self.settings
override val headerParser = self.headerParser.createShallowCopy()
override val isResponseParser = false
private[this] var method: HttpMethod = _
private[this] var uri: Uri = _
private[this] var uriBytes: ByteString = _
override def onPush(): Unit = handleParserOutput(parseSessionBytes(grab(in)))
override def onPull(): Unit = handleParserOutput(doPull())
override def onUpstreamFinish(): Unit =
if (super.shouldComplete()) completeStage()
else if (isAvailable(out)) handleParserOutput(doPull())
setHandlers(in, out, this)
private def handleParserOutput(output: RequestOutput): Unit = {
output match {
case StreamEnd => completeStage()
case NeedMoreData => pull(in)
case x => push(out, x)
}
}
override def parseMessage(input: ByteString, offset: Int): StateResult =
if (offset < input.length) {
var cursor = parseMethod(input, offset)
cursor = parseRequestTarget(input, cursor)
cursor = parseProtocol(input, cursor)
if (byteChar(input, cursor) == '\r' && byteChar(input, cursor + 1) == '\n')
parseHeaderLines(input, cursor + 2)
else if (byteChar(input, cursor) == '\n')
parseHeaderLines(input, cursor + 1)
else onBadProtocol(input.drop(cursor))
} else
// Without HTTP pipelining it's likely that buffer is exhausted after reading one message,
// so we check above explicitly if we are done and stop work here without running into NotEnoughDataException
// when continuing to parse.
continue(startNewMessage)
def parseMethod(input: ByteString, cursor: Int): Int = {
@tailrec def parseCustomMethod(ix: Int = 0, sb: JStringBuilder = new JStringBuilder(16)): Int =
if (ix < maxMethodLength) {
byteChar(input, cursor + ix) match {
case ' ' =>
customMethods(sb.toString) match {
case Some(m) =>
method = m
cursor + ix + 1
case None =>
throw new ParsingException(NotImplemented, ErrorInfo("Unsupported HTTP method", sb.toString))
}
case c => parseCustomMethod(ix + 1, sb.append(c))
}
} else
throw new ParsingException(
BadRequest,
ErrorInfo("Unsupported HTTP method",
s"HTTP method too long (started with '${sb.toString}')$remoteAddressStr. " +
"Increase `pekko.http.server.parsing.max-method-length` to support HTTP methods with more characters."))
@tailrec def parseMethod(meth: HttpMethod, ix: Int = 1): Int =
if (ix == meth.value.length)
if (byteChar(input, cursor + ix) == ' ') {
method = meth
cursor + ix + 1
} else parseCustomMethod()
else if (byteChar(input, cursor + ix) == meth.value.charAt(ix)) parseMethod(meth, ix + 1)
else parseCustomMethod()
import HttpMethods._
(byteChar(input, cursor): @switch) match {
case 'G' => parseMethod(GET)
case 'P' => byteChar(input, cursor + 1) match {
case 'O' => parseMethod(POST, 2)
case 'U' => parseMethod(PUT, 2)
case 'A' => parseMethod(PATCH, 2)
case _ => parseCustomMethod()
}
case 'D' => parseMethod(DELETE)
case 'H' => parseMethod(HEAD)
case 'O' => parseMethod(OPTIONS)
case 'T' => parseMethod(TRACE)
case 'C' => parseMethod(CONNECT)
case 0x16 =>
throw new ParsingException(
BadRequest,
ErrorInfo(
"Unsupported HTTP method",
s"The HTTP method started with 0x16 rather than any known HTTP method$remoteAddressStr. " +
"Perhaps this was an HTTPS request sent to an HTTP endpoint?"))
case _ => parseCustomMethod()
}
}
val uriParser = new UriParser(null: ParserInput, uriParsingMode = uriParsingMode)
def parseRequestTarget(input: ByteString, cursor: Int): Int = {
val uriStart = cursor
val uriEndLimit = cursor + maxUriLength
@tailrec def findUriEnd(ix: Int = cursor): Int =
if (ix == input.length) throw NotEnoughDataException
else if (CharacterClasses.WSPCRLF(input(ix).toChar)) ix
else if (ix < uriEndLimit) findUriEnd(ix + 1)
else throw new ParsingException(
UriTooLong,
s"URI length exceeds the configured limit of $maxUriLength characters$remoteAddressStr")
val uriEnd = findUriEnd()
try {
uriBytes = input.slice(uriStart, uriEnd)
uriParser.reset(new ByteStringParserInput(uriBytes))
uri = uriParser.parseHttpRequestTarget()
} catch {
case IllegalUriException(info) => throw new ParsingException(BadRequest, info)
}
uriEnd + 1
}
override def onBadProtocol(input: ByteString): Nothing = throw new ParsingException(HttpVersionNotSupported, "")
// http://tools.ietf.org/html/rfc7230#section-3.3
override def parseEntity(headers: List[HttpHeader], protocol: HttpProtocol, input: ByteString, bodyStart: Int,
clh: Option[`Content-Length`], cth: Option[`Content-Type`], isChunked: Boolean,
expect100continue: Boolean, hostHeaderPresent: Boolean, closeAfterResponseCompletion: Boolean,
sslSession: SSLSession): StateResult =
if (hostHeaderPresent || protocol == HttpProtocols.`HTTP/1.0`) {
def emitRequestStart(
createEntity: EntityCreator[RequestOutput, RequestEntity],
headers: List[HttpHeader] = headers) = {
val allHeaders0 =
if (rawRequestUriHeader)
`Raw-Request-URI`(uriBytes.decodeString(HttpCharsets.`US-ASCII`.nioCharset)) :: headers
else headers
val attributes: Map[AttributeKey[_], Any] =
if (settings.includeSslSessionAttribute) Map(AttributeKeys.sslSession -> SslSessionInfo(sslSession))
else Map.empty
val requestStart =
if (method == HttpMethods.GET) {
Handshake.Server.websocketUpgrade(headers, hostHeaderPresent, websocketSettings,
headerParser.log) match {
case OptionVal.Some(upgrade) =>
RequestStart(method, uri, protocol, attributes.updated(AttributeKeys.webSocketUpgrade, upgrade),
upgrade :: allHeaders0, createEntity, expect100continue, closeAfterResponseCompletion)
case OptionVal.None =>
RequestStart(method, uri, protocol, attributes, allHeaders0, createEntity, expect100continue,
closeAfterResponseCompletion)
}
} else RequestStart(method, uri, protocol, attributes, allHeaders0, createEntity, expect100continue,
closeAfterResponseCompletion)
emit(requestStart)
}
if (!isChunked) {
val contentLength = clh match {
case Some(`Content-Length`(len)) => len
case None => 0
}
if (contentLength == 0) {
emitRequestStart(emptyEntity(cth))
setCompletionHandling(HttpMessageParser.CompletionOk)
startNewMessage(input, bodyStart)
} else if (!method.isEntityAccepted) {
failMessageStart(UnprocessableEntity, s"${method.name} requests must not have an entity")
} else if (contentLength <= input.size - bodyStart) {
val cl = contentLength.toInt
emitRequestStart(strictEntity(cth, input, bodyStart, cl))
setCompletionHandling(HttpMessageParser.CompletionOk)
startNewMessage(input, bodyStart + cl)
} else {
emitRequestStart(defaultEntity(cth, contentLength))
parseFixedLengthBody(contentLength, closeAfterResponseCompletion)(input, bodyStart)
}
} else {
if (!method.isEntityAccepted) {
failMessageStart(UnprocessableEntity, s"${method.name} requests must not have an entity")
} else {
if (clh.isEmpty) {
emitRequestStart(chunkedEntity(cth), headers)
parseChunk(input, bodyStart, closeAfterResponseCompletion, totalBytesRead = 0L)
} else failMessageStart("A chunked request must not contain a Content-Length header")
}
}
} else failMessageStart("Request is missing required `Host` header")
private def remoteAddressStr: String =
inheritedAttributes.get[HttpAttributes.RemoteAddress].map(_.address) match {
case Some(addr) => s" from ${addr.getHostString}:${addr.getPort}"
case None => ""
}
}