in finagle-core/src/main/scala/com/twitter/finagle/service/FailFastFactory.scala [161:224]
def preprocess(elems: Seq[Observation]): Seq[Observation] = elems
def handle(o: Observation): Unit = o match {
case Observation.Success if state != Ok =>
val Retrying(_, _, task, _, _) = state
task.cancel()
markedAvailableCounter.incr()
state = Ok
case Observation.Fail(failure) if state == Ok =>
val (wait, rest) =
if (backoffs.isExhausted) (Duration.Zero, Backoff.empty)
else (backoffs.duration, backoffs.next)
val now = Time.now
val task = timer.schedule(now + wait) { this.apply(Observation.Timeout) }
markedDeadCounter.incr()
if (logger.isLoggable(Level.DEBUG))
logger.log(
Level.DEBUG,
s"""FailFastFactory marking connection to "$label" as dead. Remote Address: ${endpoint.toString}"""
)
state = Retrying(failWith(failure), now, task, 0, rest)
case Observation.Timeout if state != Ok =>
underlying(ClientConnection.nil).respond {
case Throw(t) => this.apply(Observation.TimeoutFail(t))
case Return(service) =>
this.apply(Observation.Success)
service.close()
}
case Observation.TimeoutFail(failure) if state != Ok =>
state match {
case Retrying(_, _, task, _, backoffs) if backoffs.isExhausted =>
task.cancel()
// Backoff schedule exhausted. Optimistically become available in
// order to continue trying.
state = Ok
case Retrying(_, since, task, ntries, backoffs) =>
task.cancel()
val newTask = timer.schedule(Time.now + backoffs.duration) {
this.apply(Observation.Timeout)
}
state = Retrying(failWith(failure), since, newTask, ntries + 1, backoffs.next)
case Ok => assert(false)
}
case Observation.Close =>
val oldState = state
state = Ok
oldState match {
case Retrying(_, _, task, _, _) =>
task.cancel()
case _ =>
}
case _ => ()
}