in finagle-mux/src/main/scala/com/twitter/finagle/mux/pushsession/MuxClientSession.scala [246:302]
private[this] def handleProcessMessage(msg: Message): Unit = {
msg match {
case Message.Tping(Message.Tags.PingTag) =>
h_messageWriter.write(Message.PreEncoded.Rping)
case Message.Tping(tag) =>
h_messageWriter.write(Message.Rping(tag))
case p @ Message.Rping(_) =>
if (h_pingPromise == null) {
log.info(s"($name) Received unexpected ping response: $p")
} else {
val p = h_pingPromise
h_pingPromise = null
p.setDone()
}
case Message.Rerr(_, err) =>
log.info(s"($name) Server error: $err")
h_tracker.receivedResponse(msg.tag, Throw(ServerError(err)))
case Message.Rmessage(_) =>
h_tracker.receivedResponse(msg.tag, ReqRepFilter.reply(msg))
case Message.Tdrain(tag) =>
// Ack the Tdrain and begin shutting down.
h_messageWriter.write(Message.Rdrain(tag))
h_dispatchState = Draining
drainingCounter.incr()
if (log.isLoggable(Level.TRACE))
log.trace(s"Started draining a connection to $name")
case Message.Tlease(Message.Tlease.MillisDuration, millis) =>
h_dispatchState match {
case Leasing(_) | Dispatching =>
h_dispatchState = Leasing(Time.now + millis.milliseconds)
if (log.isLoggable(Level.DEBUG))
log.debug(s"($name) leased for ${millis.milliseconds} to $name")
leaseCounter.incr()
case Draining | Drained =>
// Ignore the lease if we're closed, since these are irrecoverable states.
}
case other =>
handleShutdown(Some(new IllegalStateException(s"Unexpected message: $other")))
}
// Check if we're in a finished-draining state and transition accordingly
if (h_dispatchState == Draining && h_tracker.pendingDispatches == 0) {
if (log.isLoggable(Level.TRACE)) {
log.trace(s"Finished draining a connection to $name")
}
drainedCounter.incr()
handleShutdown(None)
}
}