in runtime/src/main/scala/org/apache/pekko/grpc/internal/ChannelUtils.scala [71:103]
private[pekko] def monitorChannel(
ready: Promise[Unit],
done: Promise[Done],
channel: ManagedChannel,
maxConnectionAttempts: Option[Int],
log: LoggingAdapter): Unit = {
def monitor(currentState: ConnectivityState, connectionAttempts: Int): Unit = {
log.debug(s"monitoring with state $currentState and connectionAttempts $connectionAttempts")
val newAttemptOpt = currentState match {
case ConnectivityState.TRANSIENT_FAILURE =>
if (maxConnectionAttempts.contains(connectionAttempts + 1)) {
val ex = new ClientConnectionException(s"Unable to establish connection after [$maxConnectionAttempts]")
ready.tryFailure(ex) || done.tryFailure(ex)
None
} else Some(connectionAttempts + 1)
case ConnectivityState.READY =>
ready.trySuccess(())
Some(0)
case ConnectivityState.SHUTDOWN =>
done.trySuccess(Done)
None
case ConnectivityState.IDLE | ConnectivityState.CONNECTING =>
Some(connectionAttempts)
}
newAttemptOpt.foreach { attempts =>
channel.notifyWhenStateChanged(currentState, () => monitor(channel.getState(false), attempts))
}
}
monitor(channel.getState(false), 0)
}