in finagle-serversets/src/main/scala/com/twitter/finagle/serverset2/HealthStabilizer.scala [27:86]
def apply(
va: Var[ClientHealth],
probationEpoch: Epoch,
statsReceiver: StatsReceiver
): Var[ClientHealth] = {
Var.async[ClientHealth](ClientHealth.Unknown) { u =>
val stateChanges = va.changes.dedup.select(probationEpoch.event).foldLeft[Status](Unknown) {
// always take the first update as our status
case (Unknown, Left(ClientHealth.Healthy)) => Healthy
case (Unknown, Left(ClientHealth.Unhealthy)) => Unhealthy
// Any change from * => healthy makes us immediately healthy
case (_, Left(ClientHealth.Healthy)) => Healthy
// Change from good to bad is placed in limbo starting now
case (Healthy, Left(ClientHealth.Unhealthy)) =>
Probation(Stopwatch.start())
// The probation epoch has ended. If we entered probation > the probation duration then we are
// now unhealthy.
case (Probation(elapsed), Right(())) if elapsed() >= probationEpoch.period =>
Unhealthy
// any other change is ignored
case (v, _) => v
}
val currentStatus = new AtomicReference[Status]()
val gaugeListener = stateChanges.dedup.register(Witness { currentStatus })
// scalafix:off StoreGaugesAsMemberVariables
val gauge = statsReceiver.addGauge("zkHealth") {
currentStatus.get() match {
case Unknown => 0
case Healthy => 1
case Unhealthy => 2
case Probation(_) => 3
}
}
// scalafix:on StoreGaugesAsMemberVariables
val notify = stateChanges
.collect {
// re-map to the underlying health status
case Healthy | Probation(_) => ClientHealth.Healthy
case Unhealthy => ClientHealth.Unhealthy
case Unknown => ClientHealth.Unknown
}
.dedup
.register(Witness(u))
Closable.all(
notify,
gaugeListener,
Closable.make { _ =>
gauge.remove()
Future.Done
})
}
}