in wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonProcessCaller.java [44:106]
public PythonProcessCaller(PythonCode serializedUDF){
//TODO create documentation to how to the configuration in the code
this.configuration = new Configuration();
this.configuration.load(ReflectionUtils.loadResource("wayang-api-python-defaults.properties"));
this.ready = false;
byte[] addr = new byte[4];
addr[0] = 127; addr[1] = 0; addr[2] = 0; addr[3] = 1;
try {
/*TODO should NOT be assigned an specific port, set port as 0 (zero)*/
this.serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(addr));
Runnable run1 = () -> {
ProcessBuilder pb = new ProcessBuilder(
Arrays.asList(
"python3",
this.configuration.getStringProperty("wayang.api.python.worker")
)
);
Map<String, String> workerEnv = pb.environment();
workerEnv.put("PYTHON_WORKER_FACTORY_PORT",
String.valueOf(this.serverSocket.getLocalPort()));
// TODO See what is happening with ENV Python version
workerEnv.put(
"PYTHONPATH",
this.configuration.getStringProperty("wayang.api.python.path")
);
pb.redirectOutput(Redirect.INHERIT);
pb.redirectError(Redirect.INHERIT);
try {
pb.start();
} catch (IOException e) {
e.printStackTrace();
}
};
this.process = new Thread(run1);
this.process.start();
// Redirect worker stdout and stderr
//IDK redirectStreamsToStderr(worker.getInputStream, worker.getErrorStream)
// Wait for it to connect to our socket
this.serverSocket.setSoTimeout(100000);
try {
this.socket = this.serverSocket.accept();
this.serverSocket.setSoTimeout(0);
if(socket.isConnected())
this.ready = true;
} catch (Exception e) {
System.out.println(e);
throw new WayangException("Python worker failed to connect back.", e);
}
} catch (Exception e){
System.out.println(e);
throw new WayangException("Python worker failed");
}
}