private def initPythonEnv()

in datafu-spark/src/main/scala/spark/utils/overwrites/SparkPythonRunner.scala [57:116]


  private def initPythonEnv(): (BufferedReader, BufferedWriter, Process) = {

    val pythonExec =
      sys.env.getOrElse("PYSPARK_DRIVER_PYTHON",
                        sys.env.getOrElse("PYSPARK_PYTHON", "python3"))

    // Format python filename paths before adding them to the PYTHONPATH
    val formattedPyFiles = PythonRunner.formatPaths(pyPaths)

    // Launch a Py4J gateway server for the process to connect to; this will let it see our
    // Java system properties and such
    val auth_token = createSecret(256)
    val gatewayServer = new GatewayServer.GatewayServerBuilder()
      .entryPoint(ScalaPythonBridge)
      .javaPort(0)
      .authToken(auth_token)
      .build()
    val thread = new Thread(new Runnable() {
      override def run(): Unit = Utils.logUncaughtExceptions {
        gatewayServer.start()
      }
    })
    thread.setName("py4j-gateway-init")
    thread.setDaemon(true)
    thread.start()

    // Wait until the gateway server has started, so that we know which port is it bound to.
    // `gatewayServer.start()` will start a new thread and run the server code there, after
    // initializing the socket, so the thread started above will end as soon as the server is
    // ready to serve connections.
    thread.join()

    // Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the
    // python directories in SPARK_HOME (if set), and any files in the pyPaths argument
    val pathElements = new ArrayBuffer[String]
    pathElements ++= formattedPyFiles
    pathElements += PythonUtils.sparkPythonPath
    pathElements += sys.env.getOrElse("PYTHONPATH", "")
    val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*)
    logger.info(
      s"Running python with PYTHONPATH:\n\t${formattedPyFiles.mkString(",")}")

    // Launch Python process
    val builder = new ProcessBuilder(
      (Seq(pythonExec, "-iu") ++ otherArgs).asJava)
    val env = builder.environment()
    env.put("PYTHONPATH", pythonPath)
    // This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
    env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string
    env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
    env.put("PYSPARK_GATEWAY_SECRET", auth_token)
    builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
    val process = builder.start()
    val writer = new BufferedWriter(
      new OutputStreamWriter(process.getOutputStream))
    val reader = new BufferedReader(
      new InputStreamReader(process.getInputStream))

    (reader, writer, process)
  }