override def createLogic()

in http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/client/PersistentConnection.scala [82:262]


    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
      new TimerGraphStageLogic(shape) with StageLogging {
        become(Unconnected)

        def become(state: State): Unit = setHandlers(requestIn, responseOut, state)

        trait State extends InHandler with OutHandler
        object Unconnected extends State {
          override def onPush(): Unit = connect(maxAttempts, Duration.Zero)
          override def onPull(): Unit =
            if (!isAvailable(requestIn) && !hasBeenPulled(requestIn)) // requestIn might already have been pulled when we failed and went back to Unconnected
              pull(requestIn)
        }

        def connect(connectsLeft: Option[Int], lastEmbargo: FiniteDuration): Unit = {
          val requestOut = new SubSourceOutlet[HttpRequest]("PersistentConnection.requestOut")
          val responseIn = new SubSinkInlet[HttpResponse]("PersistentConnection.responseIn")
          val connection = Promise[OutgoingConnection]()

          become(new Connecting(connection.future, requestOut, responseIn, connectsLeft.map(_ - 1), lastEmbargo))

          connection.completeWith(Source.fromGraph(requestOut.source)
            .viaMat(connectionFlow)(Keep.right)
            .toMat(responseIn.sink)(Keep.left)
            .run()(subFusingMaterializer))
        }

        class Connecting(
            connected: Future[OutgoingConnection],
            requestOut: SubSourceOutlet[HttpRequest],
            responseIn: SubSinkInlet[HttpResponse],
            connectsLeft: Option[Int],
            lastEmbargo: FiniteDuration) extends State {
          connected.onComplete {
            case Success(_) =>
              onConnected.invoke(())
            case Failure(cause) =>
              onFailed.invoke(cause)
          }(ExecutionContexts.parasitic)

          var requestOutPulled = false
          requestOut.setHandler(new OutHandler {
            override def onPull(): Unit =
              requestOutPulled = true
            override def onDownstreamFinish(cause: Throwable): Unit = ()
          })
          responseIn.setHandler(new InHandler {
            override def onPush(): Unit = throw new IllegalStateException("no response push expected while connecting")
            override def onUpstreamFinish(): Unit = ()
            override def onUpstreamFailure(ex: Throwable): Unit = ()
          })

          override def onPush(): Unit = () // Pull might have happened before the connection failed. Element is kept in slot.

          override def onPull(): Unit = {
            if (!isAvailable(requestIn) && !hasBeenPulled(requestIn)) // requestIn might already have been pulled when we failed and went back to Unconnected
              pull(requestIn)
          }

          val onConnected = getAsyncCallback[Unit] { _ =>
            val newState = new Connected(requestOut, responseIn)
            become(newState)
            if (requestOutPulled) {
              if (isAvailable(requestIn)) newState.dispatchRequest(grab(requestIn))
              else if (!hasBeenPulled(requestIn)) pull(requestIn)
            }
          }
          val onFailed = getAsyncCallback[Throwable] { cause =>
            // If the materialized value is failed, then the stream should be broken by design.
            // Nevertheless also kick our ends of the stream.
            responseIn.cancel()
            requestOut.fail(new StreamTcpException("connection broken"))

            if (connectsLeft.contains(0)) {
              failStage(new RuntimeException(s"Connection failed after $maxAttempts attempts", cause))
            } else {
              setHandler(requestIn, Unconnected)
              if (baseEmbargo == Duration.Zero) {
                log.info(s"Connection attempt failed: ${cause.getMessage}. Trying to connect again${connectsLeft.map(
                    n => s" ($n attempts left)").getOrElse("")}.")
                connect(connectsLeft, Duration.Zero)
              } else {
                val embargo = lastEmbargo match {
                  case Duration.Zero => baseEmbargo
                  case otherValue    => (otherValue * 2).min(maxBaseEmbargo)
                }
                val minMillis = embargo.toMillis
                val maxMillis = minMillis * 2
                val backoff = ThreadLocalRandom.current().nextLong(minMillis, maxMillis).millis
                log.info(
                  s"Connection attempt failed: ${cause.getMessage}. Trying to connect again after backoff ${PrettyDuration.format(
                      backoff)} ${connectsLeft.map(n => s" ($n attempts left)").getOrElse("")}.")
                scheduleOnce(EmbargoEnded(connectsLeft, embargo), backoff)
              }
            }
          }
        }

        override def onTimer(timerKey: Any): Unit = {
          timerKey match {
            case EmbargoEnded(connectsLeft, nextEmbargo) =>
              log.debug("Reconnecting after backoff")
              connect(connectsLeft, nextEmbargo)
          }
        }

        class Connected(
            requestOut: SubSourceOutlet[HttpRequest],
            responseIn: SubSinkInlet[HttpResponse]) extends State {
          private var ongoingRequests: Map[AssociationTag, Map[AttributeKey[_], RequestResponseAssociation]] = Map.empty
          responseIn.pull()

          requestOut.setHandler(new OutHandler {
            override def onPull(): Unit =
              if (!isAvailable(requestIn)) pull(requestIn)
              else dispatchRequest(grab(requestIn))

            override def onDownstreamFinish(cause: Throwable): Unit = onDisconnected()
          })
          responseIn.setHandler(new InHandler {
            override def onPush(): Unit = {
              val response = responseIn.grab()
              val tag = response.attribute(associationTagKey).get
              require(ongoingRequests.contains(tag))
              ongoingRequests -= tag
              push(responseOut, response.removeAttribute(associationTagKey))
            }

            override def onUpstreamFinish(): Unit = onDisconnected()
            override def onUpstreamFailure(ex: Throwable): Unit = onDisconnected() // FIXME: log error
          })
          def onDisconnected(): Unit = {
            emitMultiple[HttpResponse](responseOut,
              ongoingRequests.values.map(errorResponse.withAttributes(_)).toVector,
              () => setHandler(responseOut, Unconnected))
            responseIn.cancel()
            requestOut.fail(new RuntimeException("connection broken"))

            if (isClosed(requestIn)) {
              // user closed PersistentConnection before and we were waiting for remaining responses
              completeStage()
            } else {
              // become(Unconnected) doesn't work because of using emit
              // so we need to do it more carefully here
              setHandler(requestIn, Unconnected)
              if (isAvailable(responseOut) && !hasBeenPulled(requestIn)) pull(requestIn)
            }
          }

          def dispatchRequest(req: HttpRequest): Unit = {
            val tag = new AssociationTag
            // Some cross-compilation woes here:
            // Explicit type ascription is needed to make both 2.12 and 2.13 compile.
            ongoingRequests = ongoingRequests.updated(tag,
              req.attributes.collect({
                case (key, value: RequestResponseAssociation) => key -> value
              }: PartialFunction[(AttributeKey[_], Any), (AttributeKey[_], RequestResponseAssociation)]))
            requestOut.push(req.addAttribute(associationTagKey, tag))
          }

          override def onPush(): Unit = dispatchRequest(grab(requestIn))
          override def onPull(): Unit = responseIn.pull()

          // onUpstreamFinish expects "reasonable behavior" from downstream stages, i.e. that
          // the downstream stage will eventually close all remaining inputs/outputs. Note
          // that the PersistentConnection is often used in combination with HTTP/2 connections
          // which to timeout if the stage completion stalls.
          override def onUpstreamFinish(): Unit = requestOut.complete()

          override def onUpstreamFailure(ex: Throwable): Unit = {
            requestOut.fail(ex)
            responseIn.cancel()
            failStage(ex)
          }
          override def onDownstreamFinish(cause: Throwable): Unit = {
            requestOut.complete()
            responseIn.cancel()
            super.onDownstreamFinish(cause)
          }
        }
      }