in http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/hpack/HeaderCompression.scala [38:101]
def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with StageLogging with InHandler with OutHandler { logic =>
setHandlers(eventsIn, eventsOut, this)
private val currentMaxFrameSize = Http2Protocol.InitialMaxFrameSize
val encoder = new pekko.http.shaded.com.twitter.hpack.Encoder(Http2Protocol.InitialMaxHeaderTableSize)
val os = new ByteArrayOutputStream(128)
def onPull(): Unit = pull(eventsIn)
def onPush(): Unit = grab(eventsIn) match {
case ack @ SettingsAckFrame(s) =>
applySettings(s)
push(eventsOut, ack)
case ParsedHeadersFrame(streamId, endStream, kvs, prioInfo) =>
// When ending the stream without any payload, use a DATA frame rather than
// a HEADERS frame to work around https://github.com/golang/go/issues/47851.
if (endStream && kvs.isEmpty) push(eventsOut, DataFrame(streamId, endStream, ByteString.empty))
else {
kvs.foreach {
case (key, value: String) =>
encoder.encodeHeader(os, key, value, false)
case (key, value) =>
throw new IllegalStateException(
s"Didn't expect key-value-pair [$key] -> [$value](${value.getClass}) here.")
}
val result = ByteString.fromArrayUnsafe(os.toByteArray) // BAOS.toByteArray always creates a copy
os.reset()
if (result.size <= currentMaxFrameSize)
push(eventsOut, HeadersFrame(streamId, endStream, endHeaders = true, result, prioInfo))
else {
val first =
HeadersFrame(streamId, endStream, endHeaders = false, result.take(currentMaxFrameSize), prioInfo)
push(eventsOut, first)
setHandler(eventsOut,
new OutHandler {
private var remainingData = result.drop(currentMaxFrameSize)
def onPull(): Unit = {
val thisFragment = remainingData.take(currentMaxFrameSize)
val rest = remainingData.drop(currentMaxFrameSize)
val last = rest.isEmpty
push(eventsOut, ContinuationFrame(streamId, endHeaders = last, thisFragment))
if (last) setHandler(eventsOut, logic)
else remainingData = rest
}
})
}
}
case x => push(eventsOut, x)
}
def applySettings(s: immutable.Seq[Setting]): Unit =
s.foreach {
case Setting(SettingIdentifier.SETTINGS_HEADER_TABLE_SIZE, size) =>
log.debug("Applied SETTINGS_HEADER_TABLE_SIZE({}) in header compression", size)
// 'size' is strictly spoken unsigned, but the encoder is allowed to
// pick any size equal to or less than this value (6.5.2)
if (size >= 0) encoder.setMaxHeaderTableSize(os, size)
else encoder.setMaxHeaderTableSize(os, Int.MaxValue)
case _ => // ignore, not applicable to this stage
}
}