in http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/hpack/HeaderDecompression.scala [50:141]
def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new HandleOrPassOnStage[FrameEvent, FrameEvent](shape) {
val httpHeaderParser = masterHeaderParser.createShallowCopy()
val decoder = new pekko.http.shaded.com.twitter.hpack.Decoder(Http2Protocol.InitialMaxHeaderListSize,
Http2Protocol.InitialMaxHeaderTableSize)
become(Idle)
// simple state machine
// Idle: no ongoing HEADERS parsing
// Receiving headers: waiting for CONTINUATION frame
def parseAndEmit(
streamId: Int, endStream: Boolean, payload: ByteString, prioInfo: Option[PriorityFrame]): Unit = {
val headers = new VectorBuilder[(String, AnyRef)]
object Receiver extends HeaderListener {
def addHeader(name: String, value: String, parsed: AnyRef, sensitive: Boolean): AnyRef = {
if (parsed ne null) {
headers += name -> parsed
parsed
} else {
import Http2HeaderParsing._
def handle(parsed: AnyRef): AnyRef = {
headers += name -> parsed
parsed
}
name match {
case "content-type" => handle(ContentType.parse(name, value, parserSettings))
case ":authority" => handle(Authority.parse(name, value, parserSettings))
case ":path" => handle(PathAndQuery.parse(name, value, parserSettings))
case ":method" => handle(Method.parse(name, value, parserSettings))
case ":scheme" => handle(Scheme.parse(name, value, parserSettings))
case "content-length" => handle(ContentLength.parse(name, value, parserSettings))
case "cookie" => handle(Cookie.parse(name, value, parserSettings))
case x if x(0) == ':' => handle(value)
case _ =>
// cannot use OtherHeader.parse because that doesn't has access to header parser
val header = parseHeaderPair(httpHeaderParser, name, value)
RequestParsing.validateHeader(header)
handle(header)
}
}
}
}
try {
decoder.decode(ByteStringInputStream(payload), Receiver)
decoder.endHeaderBlock() // TODO: do we have to check the result here?
push(eventsOut, ParsedHeadersFrame(streamId, endStream, headers.result(), prioInfo))
} catch {
case ex: IOException =>
// this is signalled by the decoder when it failed, we want to react to this by rendering a GOAWAY frame
fail(eventsOut,
new Http2Compliance.Http2ProtocolException(ErrorCode.COMPRESSION_ERROR, "Decompression failed."))
}
}
object Idle extends State {
val handleEvent: PartialFunction[FrameEvent, Unit] = {
case HeadersFrame(streamId, endStream, endHeaders, fragment, prioInfo) =>
if (endHeaders) parseAndEmit(streamId, endStream, fragment, prioInfo)
else {
become(new ReceivingHeaders(streamId, endStream, fragment, prioInfo))
pull(eventsIn)
}
case c: ContinuationFrame =>
protocolError(s"Received unexpected continuation frame: $c")
// FIXME: handle SETTINGS frames that change decompression parameters
}
}
class ReceivingHeaders(streamId: Int, endStream: Boolean, initiallyReceivedData: ByteString,
priorityInfo: Option[PriorityFrame]) extends State {
var receivedData = initiallyReceivedData
val handleEvent: PartialFunction[FrameEvent, Unit] = {
case ContinuationFrame(`streamId`, endHeaders, payload) =>
if (endHeaders) {
parseAndEmit(streamId, endStream, receivedData ++ payload, priorityInfo)
become(Idle)
} else {
receivedData ++= payload
pull(eventsIn)
}
case x =>
protocolError(s"While waiting for CONTINUATION frame on stream $streamId received unexpected frame $x")
}
}
def protocolError(msg: String): Unit = failStage(new Http2ProtocolException(msg))
}