in odps-console-xflow/src/main/java/com/aliyun/openservices/odps/console/xflow/AlinkAdapter.java [325:394]
public String generateCupidTaskPlanContent(
AlinkTransformer.FlinkJobList flinkJobList,
String alinkBaseJarFilePath,
String alinkBaseJarDir,
String alinkAlgoJarFilePath,
String flinkConsoleLocalDir) throws ODPSConsoleException, OdpsException {
String flinkLogPath = FLINK_CONSOLE_LOCAL_DIR + "log/";
File flinkLogDir = new File(flinkLogPath);
if (!flinkLogDir.exists()) {
flinkLogDir.mkdirs();
}
String cupidTaskPlanContent = "";
for (AlinkTransformer.FlinkJob job : flinkJobList.flinkJobs) {
String tmpFlinkConfDir = FLINK_CONSOLE_LOCAL_DIR + UUID.randomUUID().toString() + "/";
writeFlinkConfFiles(job, flinkJobList.defaultConfFiles, tmpFlinkConfDir);
StringBuilder cmdBuilder = new StringBuilder();
cmdBuilder.append("cd " + flinkConsoleLocalDir + " && ");
Map<String, String> allEnvs = new HashMap<String, String>();
getLogWriter().writeDebug("JAVA_HOME = " + System.getenv("JAVA_HOME"));
allEnvs.put("JAVA_HOME", System.getenv("JAVA_HOME"));
allEnvs.put("FLINK_LOG_DIR", flinkLogPath);
allEnvs.put("FLINK_CONF_DIR", tmpFlinkConfDir);
allEnvs.put("HADOOP_CLASSPATH", alinkBaseJarFilePath + ":" + tmpFlinkConfDir);
allEnvs.put("CUPID_DRY_RUN_MODE_ENABLE", "true");
if (job.envs != null) {
allEnvs.putAll(job.envs);
}
for (Map.Entry<String, String> env: allEnvs.entrySet()) {
cmdBuilder.append(env.getKey() + "=" + env.getValue() + " ");
}
cmdBuilder.append("./bin/flink run ");
cmdBuilder.append("-yt " + alinkBaseJarDir + " ");
for (String param : job.cupidParams) {
cmdBuilder.append(param + " ");
}
cmdBuilder.append("-d " + alinkAlgoJarFilePath + " ");
for (String param : job.alinkParams) {
cmdBuilder.append(param + " ");
}
getLogWriter().writeError("Flink console is running...");
getLogWriter().writeDebug("Flink command:" + cmdBuilder.toString());
String cmdFile = createTempCommandFile(cmdBuilder.toString());
String result = ExecuteShell("/bin/bash " + cmdFile);
Matcher m = cupidPattern.matcher(result);
if (m.find()) {
cupidTaskPlanContent += m.group("plan");
cupidTaskPlanContent += ";";
}
getLogWriter().writeError("Flink console execute success.");
}
if (!cupidTaskPlanContent.isEmpty()) {
cupidTaskPlanContent = cupidTaskPlanContent.substring(0, cupidTaskPlanContent.length() - 1);
}
getLogWriter().writeDebug("cupidTaskPlanContent:" + cupidTaskPlanContent);
return cupidTaskPlanContent;
}