private[pekko] def monitorChannel()

in runtime/src/main/scala/org/apache/pekko/grpc/internal/ChannelUtils.scala [71:103]


  private[pekko] def monitorChannel(
      ready: Promise[Unit],
      done: Promise[Done],
      channel: ManagedChannel,
      maxConnectionAttempts: Option[Int],
      log: LoggingAdapter): Unit = {
    def monitor(currentState: ConnectivityState, connectionAttempts: Int): Unit = {
      log.debug(s"monitoring with state $currentState and connectionAttempts $connectionAttempts")
      val newAttemptOpt = currentState match {
        case ConnectivityState.TRANSIENT_FAILURE =>
          if (maxConnectionAttempts.contains(connectionAttempts + 1)) {
            val ex = new ClientConnectionException(s"Unable to establish connection after [$maxConnectionAttempts]")
            ready.tryFailure(ex) || done.tryFailure(ex)
            None
          } else Some(connectionAttempts + 1)

        case ConnectivityState.READY =>
          ready.trySuccess(())
          Some(0)

        case ConnectivityState.SHUTDOWN =>
          done.trySuccess(Done)
          None

        case ConnectivityState.IDLE | ConnectivityState.CONNECTING =>
          Some(connectionAttempts)
      }
      newAttemptOpt.foreach { attempts =>
        channel.notifyWhenStateChanged(currentState, () => monitor(channel.getState(false), attempts))
      }
    }
    monitor(channel.getState(false), 0)
  }