in datafu-spark/src/main/scala/spark/utils/overwrites/SparkPythonRunner.scala [57:116]
private def initPythonEnv(): (BufferedReader, BufferedWriter, Process) = {
val pythonExec =
sys.env.getOrElse("PYSPARK_DRIVER_PYTHON",
sys.env.getOrElse("PYSPARK_PYTHON", "python3"))
// Format python filename paths before adding them to the PYTHONPATH
val formattedPyFiles = PythonRunner.formatPaths(pyPaths)
// Launch a Py4J gateway server for the process to connect to; this will let it see our
// Java system properties and such
val auth_token = createSecret(256)
val gatewayServer = new GatewayServer.GatewayServerBuilder()
.entryPoint(ScalaPythonBridge)
.javaPort(0)
.authToken(auth_token)
.build()
val thread = new Thread(new Runnable() {
override def run(): Unit = Utils.logUncaughtExceptions {
gatewayServer.start()
}
})
thread.setName("py4j-gateway-init")
thread.setDaemon(true)
thread.start()
// Wait until the gateway server has started, so that we know which port is it bound to.
// `gatewayServer.start()` will start a new thread and run the server code there, after
// initializing the socket, so the thread started above will end as soon as the server is
// ready to serve connections.
thread.join()
// Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the
// python directories in SPARK_HOME (if set), and any files in the pyPaths argument
val pathElements = new ArrayBuffer[String]
pathElements ++= formattedPyFiles
pathElements += PythonUtils.sparkPythonPath
pathElements += sys.env.getOrElse("PYTHONPATH", "")
val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*)
logger.info(
s"Running python with PYTHONPATH:\n\t${formattedPyFiles.mkString(",")}")
// Launch Python process
val builder = new ProcessBuilder(
(Seq(pythonExec, "-iu") ++ otherArgs).asJava)
val env = builder.environment()
env.put("PYTHONPATH", pythonPath)
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string
env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
env.put("PYSPARK_GATEWAY_SECRET", auth_token)
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
val process = builder.start()
val writer = new BufferedWriter(
new OutputStreamWriter(process.getOutputStream))
val reader = new BufferedReader(
new InputStreamReader(process.getInputStream))
(reader, writer, process)
}