in kernel/src/main/scala/org/apache/toree/boot/layer/HandlerInitialization.scala [117:207]
private def initializeKernelHandlers(
actorSystem: ActorSystem, actorLoader: ActorLoader,
interpreter: Interpreter, kernel: Kernel,
commRegistrar: CommRegistrar, commStorage: CommStorage,
responseMap: collection.mutable.Map[String, ActorRef]
): Unit = {
def initializeRequestHandler[T](clazz: Class[T], messageType: MessageType, extraArguments: AnyRef*) = {
logger.debug("Creating %s handler".format(messageType.toString))
actorSystem.actorOf(
Props(clazz, actorLoader +: extraArguments: _*),
name = messageType.toString
)
}
def initializeInputHandler[T](
clazz: Class[T],
messageType: MessageType
): Unit = {
logger.debug("Creating %s handler".format(messageType.toString))
actorSystem.actorOf(
Props(clazz, actorLoader, responseMap),
name = messageType.toString
)
}
// TODO: Figure out how to pass variable number of arguments to actor
def initializeCommHandler[T](clazz: Class[T], messageType: MessageType) = {
logger.debug("Creating %s handler".format(messageType.toString))
actorSystem.actorOf(
Props(clazz, actorLoader, commRegistrar, commStorage),
name = messageType.toString
)
}
def initializeSocketHandler(socketType: SocketType, messageType: MessageType): Unit = {
logger.debug("Creating %s to %s socket handler ".format(messageType.toString ,socketType.toString))
actorSystem.actorOf(
Props(classOf[GenericSocketMessageHandler], actorLoader, socketType),
name = messageType.toString
)
}
val langInfo = interpreter.languageInfo
val internalInfo = LanguageInfo(
name=langInfo.name,
version=langInfo.version,
file_extension=langInfo.fileExtension,
pygments_lexer=langInfo.pygmentsLexer,
mimetype=langInfo.mimeType,
codemirror_mode=langInfo.codemirrorMode)
// These are the handlers for messages coming into the
initializeRequestHandler(classOf[ExecuteRequestHandler],
MessageType.Incoming.ExecuteRequest, kernel)
initializeRequestHandler(classOf[KernelInfoRequestHandler],
MessageType.Incoming.KernelInfoRequest, internalInfo)
initializeRequestHandler(classOf[CommInfoRequestHandler],
MessageType.Incoming.CommInfoRequest, commStorage)
initializeRequestHandler(classOf[CodeCompleteHandler],
MessageType.Incoming.CompleteRequest)
initializeRequestHandler(classOf[IsCompleteHandler],
MessageType.Incoming.IsCompleteRequest)
initializeInputHandler(classOf[InputRequestReplyHandler],
MessageType.Incoming.InputReply)
initializeCommHandler(classOf[CommOpenHandler],
MessageType.Incoming.CommOpen)
initializeCommHandler(classOf[CommMsgHandler],
MessageType.Incoming.CommMsg)
initializeCommHandler(classOf[CommCloseHandler],
MessageType.Incoming.CommClose)
// These are handlers for messages leaving the kernel through the sockets
initializeSocketHandler(SocketType.Shell, MessageType.Outgoing.KernelInfoReply)
initializeSocketHandler(SocketType.Shell, MessageType.Outgoing.CommInfoReply)
initializeSocketHandler(SocketType.Shell, MessageType.Outgoing.ExecuteReply)
initializeSocketHandler(SocketType.Shell, MessageType.Outgoing.CompleteReply)
initializeSocketHandler(SocketType.Shell, MessageType.Outgoing.IsCompleteReply)
initializeSocketHandler(SocketType.StdIn, MessageType.Outgoing.InputRequest)
initializeSocketHandler(SocketType.IOPub, MessageType.Outgoing.ExecuteResult)
initializeSocketHandler(SocketType.IOPub, MessageType.Outgoing.Stream)
initializeSocketHandler(SocketType.IOPub, MessageType.Outgoing.DisplayData)
initializeSocketHandler(SocketType.IOPub, MessageType.Outgoing.ClearOutput)
initializeSocketHandler(SocketType.IOPub, MessageType.Outgoing.ExecuteInput)
initializeSocketHandler(SocketType.IOPub, MessageType.Outgoing.Status)
initializeSocketHandler(SocketType.IOPub, MessageType.Outgoing.Error)
initializeSocketHandler(SocketType.IOPub, MessageType.Outgoing.CommOpen)
initializeSocketHandler(SocketType.IOPub, MessageType.Outgoing.CommMsg)
initializeSocketHandler(SocketType.IOPub, MessageType.Outgoing.CommClose)
}