in odps-console-xflow/src/main/java/com/aliyun/openservices/odps/console/xflow/AlinkTransformer.java [77:133]
public void generateFlinkJobs(
ExecutionContext paiContext,
CommandLine commandLine,
String flinkConsoleRootDir,
String alinkBaseJarPath,
String alinkAlgoJarPath) throws Exception {
String algoJobJson = constructAlgoJobJson(paiContext, commandLine);
if (flinkConsoleRootDir == null) {
throw new ODPSConsoleException("Flink console root dir " + flinkConsoleRootDir + " not found.");
}
String flinkConsoleLibDir = flinkConsoleRootDir + "/lib/";
List<File> files = (List<File>) FileUtils.listFiles(new File(flinkConsoleLibDir), null, true);
files.add(0, new File(alinkBaseJarPath));
files.add(0, new File(alinkAlgoJarPath));
URL[] urls = new URL[files.size()];
int i = 0;
for (File file : files) {
String filePath = "file://" + file.getAbsolutePath();
urls[i++] = new URL(filePath).toURI().toURL();
paiContext.getOutputWriter().writeDebug("Find jar file " + filePath + ".");
}
ByteArrayOutputStream out = new ByteArrayOutputStream();
ByteArrayOutputStream err = new ByteArrayOutputStream();
PrintStream origOut = System.out;
PrintStream origErr = System.err;
try {
System.setOut(new PrintStream(out, true));
System.setErr(new PrintStream(err, true));
URLClassLoader child = new URLClassLoader(urls);
Class classToLoad = Class.forName("com.alibaba.alink.executor.pai.PaiMain", true, child);
Method method = classToLoad.getDeclaredMethod("getFlinkJobsJson", String.class);
Object instance = classToLoad.getDeclaredConstructor().newInstance();
Object result = method.invoke(instance, algoJobJson);
Gson gson = new GsonBuilder().disableHtmlEscaping().create();
flinkJobList = gson.fromJson((String)result, FlinkJobList.class);
// print alink version
method = classToLoad.getDeclaredMethod("getVersion");
instance = classToLoad.getDeclaredConstructor().newInstance();
result = method.invoke(instance);
// resume standard stdout/stderr
System.setOut(origOut);
System.setErr(origErr);
paiContext.getOutputWriter().writeDebug(out.toString());
paiContext.getOutputWriter().writeDebug(err.toString());
paiContext.getOutputWriter().writeError("Alink version:" + (String)result);
} catch(Exception e) {
System.setOut(origOut);
System.setErr(origErr);
e.printStackTrace();
throw e;
}
}