override def initialAttributes = Attributes.name()

in http-core/src/main/scala/org/apache/pekko/http/impl/engine/server/HttpServerBluePrint.scala [679:823]


    override def initialAttributes = Attributes.name("ProtocolSwitchStage")

    override val shape = BidiShape(fromHttp, toNet, fromNet, toHttp)

    def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
      import pekko.http.impl.engine.rendering.ResponseRenderingOutput._

      /*
       * These handlers are in charge until a switch command comes in, then they
       * are replaced.
       */

      setHandler(fromHttp,
        new InHandler {
          override def onPush(): Unit =
            grab(fromHttp) match {
              case HttpData(b) => push(toNet, b)
              case SwitchToOtherProtocol(bytes, handlerFlow) =>
                push(toNet, bytes)
                complete(toHttp)
                cancel(fromHttp)
                switchToOtherProtocol(handlerFlow)
            }
          override def onUpstreamFinish(): Unit = complete(toNet)
          override def onUpstreamFailure(ex: Throwable): Unit = fail(toNet, ex)
        })
      setHandler(toNet,
        new OutHandler {
          override def onPull(): Unit = pull(fromHttp)
          override def onDownstreamFinish(): Unit = completeStage()
        })

      setHandler(fromNet,
        new InHandler {
          override def onPush(): Unit = push(toHttp, grab(fromNet))
          override def onUpstreamFinish(): Unit = complete(toHttp)
          override def onUpstreamFailure(ex: Throwable): Unit = fail(toHttp, ex)
        })
      setHandler(toHttp,
        new OutHandler {
          override def onPull(): Unit = pull(fromNet)
          override def onDownstreamFinish(): Unit = cancel(fromNet)
        })

      private var activeTimers = 0
      private def timeout = ActorMaterializerHelper.downcast(materializer).settings.subscriptionTimeoutSettings.timeout
      private def addTimeout(s: SubscriptionTimeout): Unit = {
        if (activeTimers == 0) setKeepGoing(true)
        activeTimers += 1
        scheduleOnce(s, timeout)
      }
      private def cancelTimeout(s: SubscriptionTimeout): Unit =
        if (isTimerActive(s)) {
          activeTimers -= 1
          if (activeTimers == 0) setKeepGoing(false)
          cancelTimer(s)
        }
      override def onTimer(timerKey: Any): Unit = timerKey match {
        case SubscriptionTimeout(f) =>
          activeTimers -= 1
          if (activeTimers == 0) setKeepGoing(false)
          f()
      }

      def switchToOtherProtocol(newFlow: Flow[ByteString, ByteString, Any]): Unit = {

        val sinkIn = new SubSinkInlet[ByteString]("FrameSink")
        sinkIn.setHandler(new InHandler {
          override def onPush(): Unit = push(toNet, sinkIn.grab())
          override def onUpstreamFinish(): Unit = complete(toNet)
          override def onUpstreamFailure(ex: Throwable): Unit = fail(toNet, ex)
        })

        if (isClosed(fromNet)) {
          setHandler(toNet,
            new OutHandler {
              override def onPull(): Unit = sinkIn.pull()
              override def onDownstreamFinish(): Unit = {
                completeStage()
                sinkIn.cancel()
              }
            })
          newFlow.runWith(Source.empty, sinkIn.sink)(subFusingMaterializer)
        } else {
          val sourceOut = new SubSourceOutlet[ByteString]("FrameSource")

          val timeoutKey = SubscriptionTimeout(() => {
            sourceOut.timeout(timeout)
            if (sourceOut.isClosed) completeStage()
          })
          addTimeout(timeoutKey)

          setHandler(toNet,
            new OutHandler {
              override def onPull(): Unit = sinkIn.pull()
              override def onDownstreamFinish(): Unit = {
                completeStage()
                sinkIn.cancel()
                sourceOut.complete()
              }
            })

          setHandler(fromNet,
            new InHandler {
              override def onPush(): Unit = {
                if (sourceOut.isAvailable) {
                  sourceOut.push(grab(fromNet).bytes)
                }
              }
              override def onUpstreamFinish(): Unit = sourceOut.complete()
              override def onUpstreamFailure(ex: Throwable): Unit = sourceOut.fail(ex)
            })
          sourceOut.setHandler(new OutHandler {
            override def onPull(): Unit = {
              // This check only needed on the first pull due to potential element
              // pushed in response to pull by previous source
              if (isAvailable(fromNet)) {
                sourceOut.push(grab(fromNet).bytes)
              } else if (!hasBeenPulled(fromNet)) {
                pull(fromNet)
              }
              cancelTimeout(timeoutKey)

              sourceOut.setHandler(new OutHandler {
                override def onPull(): Unit = if (!hasBeenPulled(fromNet)) pull(fromNet)
                override def onDownstreamFinish(): Unit = cancel(fromNet)
              })
            }
            override def onDownstreamFinish(): Unit = cancel(fromNet)
          })

          // disable the old handlers, at this point we might still get something due to cancellation delay which we need to ignore
          setHandlers(fromHttp, toHttp,
            new InHandler with OutHandler {
              override def onPush(): Unit = ()
              override def onPull(): Unit = ()
              override def onUpstreamFinish(): Unit = ()
              override def onUpstreamFailure(ex: Throwable): Unit = ()
              override def onDownstreamFinish(): Unit = ()
            })

          newFlow.runWith(sourceOut.source, sinkIn.sink)(subFusingMaterializer)
        }
      }
    }