in kernel-api/src/main/scala/org/apache/toree/interpreter/broker/BrokerProcess.scala [153:209]
def start(): Unit = currentExecutor.synchronized {
assert(currentExecutor.isEmpty, "Process has already been started!")
val capitalizedBrokerName = brokerName.capitalize
val script = copyResourceToTmp(entryResource)
logger.debug(s"New $brokerName script created: $script")
val createdResources = otherResources.map(copyResourceToTmp)
// Verify that all files were successfully created
val createdResult = (script +: createdResources).map(new File(_)).map(f => {
if (f.exists()) true
else {
val resource = f.getPath
logger.warn(s"Failed to create resource: $resource")
false
}
}).forall(_ == true)
if (!createdResult) throw new BrokerException(
s"Failed to create resources for $capitalizedBrokerName"
)
val commandLine = CommandLine
.parse(processName)
.addArgument(script)
arguments.foreach(commandLine.addArgument)
logger.debug(s"$capitalizedBrokerName command: ${commandLine.toString}")
val executor = newExecutor()
// TODO: Figure out how to dynamically update the output stream used
// to use kernel.out, kernel.err, and kernel.in
// NOTE: Currently mapping to standard output/input, which will be caught
// by our system and redirected through the kernel to the client
executor.setStreamHandler(new PumpStreamHandler(
System.out,
System.err,
System.in
))
// Marking exit status of 1 as successful exit
executor.setExitValue(1)
// Prevent the runner from being killed due to run time as it is a
// long-term process
executor.setWatchdog(new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT))
val processEnvironment = newProcessEnvironment().asJava
logger.debug(s"$capitalizedBrokerName environment: $processEnvironment")
// Start the process using the environment provided to the parent
executor.execute(commandLine, processEnvironment, brokerProcessHandler)
currentExecutor = Some(executor)
}