private[this] def handleProcessMessage()

in finagle-mux/src/main/scala/com/twitter/finagle/mux/pushsession/MuxClientSession.scala [246:302]


  private[this] def handleProcessMessage(msg: Message): Unit = {
    msg match {
      case Message.Tping(Message.Tags.PingTag) =>
        h_messageWriter.write(Message.PreEncoded.Rping)

      case Message.Tping(tag) =>
        h_messageWriter.write(Message.Rping(tag))

      case p @ Message.Rping(_) =>
        if (h_pingPromise == null) {
          log.info(s"($name) Received unexpected ping response: $p")
        } else {
          val p = h_pingPromise
          h_pingPromise = null
          p.setDone()
        }

      case Message.Rerr(_, err) =>
        log.info(s"($name) Server error: $err")
        h_tracker.receivedResponse(msg.tag, Throw(ServerError(err)))

      case Message.Rmessage(_) =>
        h_tracker.receivedResponse(msg.tag, ReqRepFilter.reply(msg))

      case Message.Tdrain(tag) =>
        // Ack the Tdrain and begin shutting down.
        h_messageWriter.write(Message.Rdrain(tag))
        h_dispatchState = Draining
        drainingCounter.incr()
        if (log.isLoggable(Level.TRACE))
          log.trace(s"Started draining a connection to $name")

      case Message.Tlease(Message.Tlease.MillisDuration, millis) =>
        h_dispatchState match {
          case Leasing(_) | Dispatching =>
            h_dispatchState = Leasing(Time.now + millis.milliseconds)
            if (log.isLoggable(Level.DEBUG))
              log.debug(s"($name) leased for ${millis.milliseconds} to $name")
            leaseCounter.incr()
          case Draining | Drained =>
          // Ignore the lease if we're closed, since these are irrecoverable states.
        }

      case other =>
        handleShutdown(Some(new IllegalStateException(s"Unexpected message: $other")))
    }

    // Check if we're in a finished-draining state and transition accordingly
    if (h_dispatchState == Draining && h_tracker.pendingDispatches == 0) {
      if (log.isLoggable(Level.TRACE)) {
        log.trace(s"Finished draining a connection to $name")
      }

      drainedCounter.incr()
      handleShutdown(None)
    }
  }