override def process()

in kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/handler/ExecuteRequestHandler.scala [51:151]


  override def process(km: KernelMessage): Future[_] = {
    // Mark the message as our new incoming kernel message for execution
    ExecuteRequestState.processIncomingKernelMessage(km)

    val skeletonBuilder = KMBuilder().withParent(km).withIds(km.ids)
    val executionCount = ExecutionCounter.incr(km.header.session)
    val relayActor = actorLoader.load(SystemActorType.KernelMessageRelay)

    def handleExecuteRequest(executeRequest: ExecuteRequest):
                             Future[(ExecuteReply, ExecuteResult)] = {
      //  Send an ExecuteInput to the client saying we are executing something
      val executeInputMessage = skeletonBuilder
        .withHeader(MessageType.Outgoing.ExecuteInput)
        .withContentString(ExecuteInput(executeRequest.code, executionCount)).build

      relayMsg(executeInputMessage, relayActor)

      // Construct our new set of streams
      // TODO: Add support for error streams
      val outputStream = kernel.factory(
        parentMessage = km,
        kmBuilder = skeletonBuilder
      ).newKernelOutputStream()
      val executeFuture = ask(
        actorLoader.load(SystemActorType.ExecuteRequestRelay),
        (executeRequest, km, outputStream)
      ).mapTo[(ExecuteReply, ExecuteResult)]

      if (!executeRequest.silent && kernel.pluginManager != null){
        import org.apache.toree.plugins.Implicits._
        kernel.pluginManager.fireEvent(PreRunCell, "outputStream" -> outputStream)
      }

      // Flush the output stream after code execution completes to ensure
      // stream messages are sent prior to idle status messages.
      executeFuture andThen { case result =>
        outputStream.flush()
        result
      } andThen {
        case Success(tuple) =>
          val (executeReply, executeResult) = updateCount(tuple, executionCount)

          if (executeReply.status.equals("error")) {
            // Send an ExecuteReplyError with the result of the code execution to ioPub.error
            val replyError: ExecuteReply = ExecuteReplyError(
              executionCount,
              executeReply.ename,
              executeReply.evalue,
              executeReply.traceback)
            relayErrorMessages(relayActor, replyError, skeletonBuilder)
          } else {
            //  Send an ExecuteReply to the client
            val executeReplyMsg = skeletonBuilder
              .withHeader(MessageType.Outgoing.ExecuteReply)
              .withMetadata(Metadata("status" -> executeReply.status))
              .withContentString(executeReply).build

            relayMsg(executeReplyMsg, relayActor)

            if (executeResult.hasContent) {
              val executeResultMsg = skeletonBuilder
                .withIds(Seq(MessageType.Outgoing.ExecuteResult.toString.getBytes))
                .withHeader(MessageType.Outgoing.ExecuteResult)
                .withContentString(executeResult).build
              if (!executeRequest.silent && kernel.pluginManager != null){
                import org.apache.toree.plugins.Implicits._
                kernel.pluginManager.fireEvent(PostRunCell, "executeResult" -> executeResult)
              }
              relayMsg(executeResultMsg, relayActor)
            }
          }

        case Failure(error: Throwable) =>
          //  Send an ExecuteReplyError to the client on the Shell socket
          val replyError: ExecuteReply = ExecuteReplyError(
            executionCount,
            Option(error.getClass.getCanonicalName),
            Option(error.getMessage),
            Option(error.getStackTrace.map(_.toString).toList))
          relayErrorMessages(relayActor, replyError, skeletonBuilder)
      }
    }

    def parseErrorHandler(invalid: Seq[(JsPath, Seq[JsonValidationError])]) = {
      val errs = invalid.map (e => s"JSPath ${e._1} has error ${e._2}").toList
      logger.error(s"Validation errors when parsing ExecuteRequest: ${errs}")
      val replyError: ExecuteReply = ExecuteReplyError(
        executionCount,
        Option("JsonParseException"),
        Option("Error parsing fields"),
        Option(errs)
      )
      Future { relayErrorMessages(relayActor, replyError, skeletonBuilder) }
    }

    Utilities.parseAndHandle(
      km.contentString,
      ExecuteRequest.executeRequestReads,
      handler    = handleExecuteRequest,
      errHandler = parseErrorHandler)
  }