in http-core/src/main/scala/org/apache/pekko/http/impl/engine/parsing/HttpMessageParser.scala [257:329]
protected final def parseChunk(
input: ByteString, offset: Int, isLastMessage: Boolean, totalBytesRead: Long): StateResult = {
@tailrec def parseTrailer(extension: String, lineStart: Int, headers: List[HttpHeader] = Nil,
headerCount: Int = 0): StateResult = {
var errorInfo: ErrorInfo = null
val lineEnd =
try headerParser.parseHeaderLine(input, lineStart)()
catch { case e: ParsingException => errorInfo = e.info; 0 }
if (errorInfo eq null) {
headerParser.resultHeader match {
case EmptyHeader =>
val lastChunk =
if (extension.isEmpty && headers.isEmpty) HttpEntity.LastChunk
else HttpEntity.LastChunk(extension, headers)
emit(EntityChunk(lastChunk))
emit(MessageEnd)
setCompletionHandling(CompletionOk)
if (isLastMessage) terminate()
else startNewMessage(input, lineEnd)
case header if headerCount < settings.maxHeaderCount =>
parseTrailer(extension, lineEnd, header :: headers, headerCount + 1)
case _ => failEntityStream(
s"Chunk trailer contains more than the configured limit of ${settings.maxHeaderCount} headers")
}
} else failEntityStream(errorInfo)
}
def parseChunkBody(chunkSize: Int, extension: String, cursor: Int): StateResult =
if (chunkSize > 0) {
val chunkBodyEnd = cursor + chunkSize
def result(terminatorLen: Int) = {
emit(EntityChunk(HttpEntity.Chunk(input.slice(cursor, chunkBodyEnd).compact, extension)))
Trampoline(_ => parseChunk(input, chunkBodyEnd + terminatorLen, isLastMessage, totalBytesRead + chunkSize))
}
byteChar(input, chunkBodyEnd) match {
case '\r' if byteChar(input, chunkBodyEnd + 1) == '\n' => result(2)
case '\n' => result(1)
case x => failEntityStream("Illegal chunk termination")
}
} else parseTrailer(extension, cursor)
@tailrec def parseChunkExtensions(chunkSize: Int, cursor: Int)(startIx: Int = cursor): StateResult =
if (cursor - startIx <= settings.maxChunkExtLength) {
def extension = asciiString(input, startIx, cursor)
byteChar(input, cursor) match {
case '\r' if byteChar(input, cursor + 1) == '\n' => parseChunkBody(chunkSize, extension, cursor + 2)
case '\n' => parseChunkBody(chunkSize, extension, cursor + 1)
case _ => parseChunkExtensions(chunkSize, cursor + 1)(startIx)
}
} else failEntityStream(
s"HTTP chunk extension length exceeds configured limit of ${settings.maxChunkExtLength} characters")
@tailrec def parseSize(cursor: Int, size: Long): StateResult =
if (size <= Int.MaxValue) {
byteChar(input, cursor) match {
case c if CharacterClasses.HEXDIG(c) => parseSize(cursor + 1, size * 16 + CharUtils.hexValue(c))
case c if size > settings.maxChunkSize =>
failEntityStream(
s"HTTP chunk of $size bytes exceeds the configured limit of ${settings.maxChunkSize} bytes")
case ';' if cursor > offset => parseChunkExtensions(size.toInt, cursor + 1)()
case '\r' if cursor > offset && byteChar(input, cursor + 1) == '\n' =>
parseChunkBody(size.toInt, "", cursor + 2)
case '\n' if cursor > offset => parseChunkBody(size.toInt, "", cursor + 1)
case c if CharacterClasses.WSP(c) => parseSize(cursor + 1, size) // illegal according to the spec but can happen, see issue #1812
case c => failEntityStream(s"Illegal character '${escape(c)}' in chunk start")
}
} else failEntityStream(s"HTTP chunk size exceeds Integer.MAX_VALUE (${Int.MaxValue}) bytes")
try parseSize(offset, 0)
catch {
case NotEnoughDataException => continue(input, offset)(parseChunk(_, _, isLastMessage, totalBytesRead))
}
}