in kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/handler/ExecuteRequestHandler.scala [51:151]
override def process(km: KernelMessage): Future[_] = {
// Mark the message as our new incoming kernel message for execution
ExecuteRequestState.processIncomingKernelMessage(km)
val skeletonBuilder = KMBuilder().withParent(km).withIds(km.ids)
val executionCount = ExecutionCounter.incr(km.header.session)
val relayActor = actorLoader.load(SystemActorType.KernelMessageRelay)
def handleExecuteRequest(executeRequest: ExecuteRequest):
Future[(ExecuteReply, ExecuteResult)] = {
// Send an ExecuteInput to the client saying we are executing something
val executeInputMessage = skeletonBuilder
.withHeader(MessageType.Outgoing.ExecuteInput)
.withContentString(ExecuteInput(executeRequest.code, executionCount)).build
relayMsg(executeInputMessage, relayActor)
// Construct our new set of streams
// TODO: Add support for error streams
val outputStream = kernel.factory(
parentMessage = km,
kmBuilder = skeletonBuilder
).newKernelOutputStream()
val executeFuture = ask(
actorLoader.load(SystemActorType.ExecuteRequestRelay),
(executeRequest, km, outputStream)
).mapTo[(ExecuteReply, ExecuteResult)]
if (!executeRequest.silent && kernel.pluginManager != null){
import org.apache.toree.plugins.Implicits._
kernel.pluginManager.fireEvent(PreRunCell, "outputStream" -> outputStream)
}
// Flush the output stream after code execution completes to ensure
// stream messages are sent prior to idle status messages.
executeFuture andThen { case result =>
outputStream.flush()
result
} andThen {
case Success(tuple) =>
val (executeReply, executeResult) = updateCount(tuple, executionCount)
if (executeReply.status.equals("error")) {
// Send an ExecuteReplyError with the result of the code execution to ioPub.error
val replyError: ExecuteReply = ExecuteReplyError(
executionCount,
executeReply.ename,
executeReply.evalue,
executeReply.traceback)
relayErrorMessages(relayActor, replyError, skeletonBuilder)
} else {
// Send an ExecuteReply to the client
val executeReplyMsg = skeletonBuilder
.withHeader(MessageType.Outgoing.ExecuteReply)
.withMetadata(Metadata("status" -> executeReply.status))
.withContentString(executeReply).build
relayMsg(executeReplyMsg, relayActor)
if (executeResult.hasContent) {
val executeResultMsg = skeletonBuilder
.withIds(Seq(MessageType.Outgoing.ExecuteResult.toString.getBytes))
.withHeader(MessageType.Outgoing.ExecuteResult)
.withContentString(executeResult).build
if (!executeRequest.silent && kernel.pluginManager != null){
import org.apache.toree.plugins.Implicits._
kernel.pluginManager.fireEvent(PostRunCell, "executeResult" -> executeResult)
}
relayMsg(executeResultMsg, relayActor)
}
}
case Failure(error: Throwable) =>
// Send an ExecuteReplyError to the client on the Shell socket
val replyError: ExecuteReply = ExecuteReplyError(
executionCount,
Option(error.getClass.getCanonicalName),
Option(error.getMessage),
Option(error.getStackTrace.map(_.toString).toList))
relayErrorMessages(relayActor, replyError, skeletonBuilder)
}
}
def parseErrorHandler(invalid: Seq[(JsPath, Seq[JsonValidationError])]) = {
val errs = invalid.map (e => s"JSPath ${e._1} has error ${e._2}").toList
logger.error(s"Validation errors when parsing ExecuteRequest: ${errs}")
val replyError: ExecuteReply = ExecuteReplyError(
executionCount,
Option("JsonParseException"),
Option("Error parsing fields"),
Option(errs)
)
Future { relayErrorMessages(relayActor, replyError, skeletonBuilder) }
}
Utilities.parseAndHandle(
km.contentString,
ExecuteRequest.executeRequestReads,
handler = handleExecuteRequest,
errHandler = parseErrorHandler)
}