def preprocess()

in finagle-core/src/main/scala/com/twitter/finagle/service/FailFastFactory.scala [161:224]


    def preprocess(elems: Seq[Observation]): Seq[Observation] = elems

    def handle(o: Observation): Unit = o match {
      case Observation.Success if state != Ok =>
        val Retrying(_, _, task, _, _) = state
        task.cancel()
        markedAvailableCounter.incr()
        state = Ok

      case Observation.Fail(failure) if state == Ok =>
        val (wait, rest) =
          if (backoffs.isExhausted) (Duration.Zero, Backoff.empty)
          else (backoffs.duration, backoffs.next)

        val now = Time.now
        val task = timer.schedule(now + wait) { this.apply(Observation.Timeout) }
        markedDeadCounter.incr()

        if (logger.isLoggable(Level.DEBUG))
          logger.log(
            Level.DEBUG,
            s"""FailFastFactory marking connection to "$label" as dead. Remote Address: ${endpoint.toString}"""
          )

        state = Retrying(failWith(failure), now, task, 0, rest)

      case Observation.Timeout if state != Ok =>
        underlying(ClientConnection.nil).respond {
          case Throw(t) => this.apply(Observation.TimeoutFail(t))
          case Return(service) =>
            this.apply(Observation.Success)
            service.close()
        }

      case Observation.TimeoutFail(failure) if state != Ok =>
        state match {
          case Retrying(_, _, task, _, backoffs) if backoffs.isExhausted =>
            task.cancel()
            // Backoff schedule exhausted. Optimistically become available in
            // order to continue trying.
            state = Ok

          case Retrying(_, since, task, ntries, backoffs) =>
            task.cancel()
            val newTask = timer.schedule(Time.now + backoffs.duration) {
              this.apply(Observation.Timeout)
            }
            state = Retrying(failWith(failure), since, newTask, ntries + 1, backoffs.next)

          case Ok => assert(false)
        }

      case Observation.Close =>
        val oldState = state
        state = Ok
        oldState match {
          case Retrying(_, _, task, _, _) =>
            task.cancel()
          case _ =>
        }

      case _ => ()

    }