override def initialAttributes = Attributes.name()

in http-core/src/main/scala/org/apache/pekko/http/impl/engine/client/OutgoingConnectionBlueprint.scala [303:374]


    override def initialAttributes = Attributes.name("ResponseParsingMerge")

    val shape = new FanInShape2(dataIn, bypassIn, responseOut)

    override def createLogic(effectiveAttributes: Attributes) = new GraphStageLogic(shape) {
      // each connection uses a single (private) response parser instance for all its responses
      // which builds a cache of all header instances seen on that connection
      val parser = rootParser.createShallowCopy()
      var waitingForMethod = true
      var completeStagePending = false

      setHandler(bypassIn,
        new InHandler {
          override def onPush(): Unit = {
            val responseContext = grab(bypassIn)
            parser.setContextForNextResponse(responseContext)
            val output = parser.parseBytes(ByteString.empty)
            drainParser(output)
          }
          override def onUpstreamFinish(): Unit =
            if (waitingForMethod) completeStage()
        })

      setHandler(dataIn,
        new InHandler {
          override def onPush(): Unit = {
            val bytes = grab(dataIn)
            val output = parser.parseSessionBytes(bytes)
            drainParser(output)
          }
          override def onUpstreamFinish(): Unit =
            if (waitingForMethod) completeStage()
            else {
              if (parser.onUpstreamFinish()) {
                completeStage()
              } else {
                completeStagePending = true
                emit(responseOut, parser.onPull() :: Nil, () => completeStage())
              }
            }
        })

      setHandler(responseOut, eagerTerminateOutput)

      val getNextMethod = () => {
        waitingForMethod = true
        if (isClosed(bypassIn)) completeStage()
        else pull(bypassIn)
      }

      val getNextData = () => {
        waitingForMethod = false
        if (isClosed(dataIn)) {
          if (!completeStagePending)
            completeStage()
        } else pull(dataIn)
      }

      @tailrec def drainParser(current: ResponseOutput, b: ListBuffer[ResponseOutput] = ListBuffer.empty): Unit = {
        def e(output: List[ResponseOutput], andThen: () => Unit): Unit =
          if (output.nonEmpty) emit(responseOut, output, andThen)
          else andThen()
        current match {
          case NeedNextRequestMethod => e(b.result(), getNextMethod)
          case StreamEnd             => e(b.result(), () => completeStage())
          case NeedMoreData          => e(b.result(), getNextData)
          case x                     => drainParser(parser.onPull(), b += x)
        }
      }

      override def preStart(): Unit = getNextMethod()
    }