def createLogic()

in http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/hpack/HeaderDecompression.scala [50:141]


  def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new HandleOrPassOnStage[FrameEvent, FrameEvent](shape) {
      val httpHeaderParser = masterHeaderParser.createShallowCopy()
      val decoder = new pekko.http.shaded.com.twitter.hpack.Decoder(Http2Protocol.InitialMaxHeaderListSize,
        Http2Protocol.InitialMaxHeaderTableSize)

      become(Idle)

      // simple state machine
      // Idle: no ongoing HEADERS parsing
      // Receiving headers: waiting for CONTINUATION frame

      def parseAndEmit(
          streamId: Int, endStream: Boolean, payload: ByteString, prioInfo: Option[PriorityFrame]): Unit = {
        val headers = new VectorBuilder[(String, AnyRef)]
        object Receiver extends HeaderListener {
          def addHeader(name: String, value: String, parsed: AnyRef, sensitive: Boolean): AnyRef = {
            if (parsed ne null) {
              headers += name -> parsed
              parsed
            } else {
              import Http2HeaderParsing._
              def handle(parsed: AnyRef): AnyRef = {
                headers += name -> parsed
                parsed
              }

              name match {
                case "content-type"   => handle(ContentType.parse(name, value, parserSettings))
                case ":authority"     => handle(Authority.parse(name, value, parserSettings))
                case ":path"          => handle(PathAndQuery.parse(name, value, parserSettings))
                case ":method"        => handle(Method.parse(name, value, parserSettings))
                case ":scheme"        => handle(Scheme.parse(name, value, parserSettings))
                case "content-length" => handle(ContentLength.parse(name, value, parserSettings))
                case "cookie"         => handle(Cookie.parse(name, value, parserSettings))
                case x if x(0) == ':' => handle(value)
                case _                =>
                  // cannot use OtherHeader.parse because that doesn't has access to header parser
                  val header = parseHeaderPair(httpHeaderParser, name, value)
                  RequestParsing.validateHeader(header)
                  handle(header)
              }
            }
          }
        }
        try {
          decoder.decode(ByteStringInputStream(payload), Receiver)
          decoder.endHeaderBlock() // TODO: do we have to check the result here?

          push(eventsOut, ParsedHeadersFrame(streamId, endStream, headers.result(), prioInfo))
        } catch {
          case ex: IOException =>
            // this is signalled by the decoder when it failed, we want to react to this by rendering a GOAWAY frame
            fail(eventsOut,
              new Http2Compliance.Http2ProtocolException(ErrorCode.COMPRESSION_ERROR, "Decompression failed."))
        }
      }

      object Idle extends State {
        val handleEvent: PartialFunction[FrameEvent, Unit] = {
          case HeadersFrame(streamId, endStream, endHeaders, fragment, prioInfo) =>
            if (endHeaders) parseAndEmit(streamId, endStream, fragment, prioInfo)
            else {
              become(new ReceivingHeaders(streamId, endStream, fragment, prioInfo))
              pull(eventsIn)
            }
          case c: ContinuationFrame =>
            protocolError(s"Received unexpected continuation frame: $c")

          // FIXME: handle SETTINGS frames that change decompression parameters
        }
      }
      class ReceivingHeaders(streamId: Int, endStream: Boolean, initiallyReceivedData: ByteString,
          priorityInfo: Option[PriorityFrame]) extends State {
        var receivedData = initiallyReceivedData

        val handleEvent: PartialFunction[FrameEvent, Unit] = {
          case ContinuationFrame(`streamId`, endHeaders, payload) =>
            if (endHeaders) {
              parseAndEmit(streamId, endStream, receivedData ++ payload, priorityInfo)
              become(Idle)
            } else {
              receivedData ++= payload
              pull(eventsIn)
            }
          case x =>
            protocolError(s"While waiting for CONTINUATION frame on stream $streamId received unexpected frame $x")
        }
      }

      def protocolError(msg: String): Unit = failStage(new Http2ProtocolException(msg))
    }