in communication/src/main/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnable.scala [137:176]
protected def newZmqSocket(zmqContext: ZMQ.Context, socketType: SocketType) =
zmqContext.socket(socketType)
override def run(): Unit = {
val socket = newZmqSocket(context, socketType)//context.socket(socketType.`type`)
try {
processOptions(socket)
while (notClosed) {
Try(processNextOutboundMessage(socket)).failed.foreach(
logger.error("Failed to send next outgoing message!", _: Throwable)
)
Try(processNextInboundMessage(socket)).failed.foreach({
e: Throwable => {
e match {
case ex: java.lang.IllegalStateException => {
// if (ex.getMessage != "Cannot receive another request") {
// logger.error("Failed to retrieve next incoming message!", e)
// } else {
// /* Swallow this common exception */
// }
}
case _ => logger.error("Failed to retrieve next incoming message!", e)
}
}})
Thread.sleep(1)
}
} catch {
case ex: Exception =>
logger.error("Unexpected exception in 0mq socket runnable!", ex)
} finally {
try{
socket.close()
} catch {
case ex: Exception =>
logger.error("Failed to close socket!", _: Throwable)
}
}
}