public String generateCupidTaskPlanContent()

in odps-console-xflow/src/main/java/com/aliyun/openservices/odps/console/xflow/AlinkAdapter.java [325:394]


    public String generateCupidTaskPlanContent(
            AlinkTransformer.FlinkJobList flinkJobList,
            String alinkBaseJarFilePath,
            String alinkBaseJarDir,
            String alinkAlgoJarFilePath,
            String flinkConsoleLocalDir) throws ODPSConsoleException, OdpsException {

        String flinkLogPath = FLINK_CONSOLE_LOCAL_DIR + "log/";
        File flinkLogDir = new File(flinkLogPath);
        if (!flinkLogDir.exists()) {
            flinkLogDir.mkdirs();
        }

        String cupidTaskPlanContent = "";
        for (AlinkTransformer.FlinkJob job : flinkJobList.flinkJobs) {
            String tmpFlinkConfDir = FLINK_CONSOLE_LOCAL_DIR + UUID.randomUUID().toString() + "/";
            writeFlinkConfFiles(job, flinkJobList.defaultConfFiles, tmpFlinkConfDir);

            StringBuilder cmdBuilder = new StringBuilder();
            cmdBuilder.append("cd " + flinkConsoleLocalDir + " && ");

            Map<String, String> allEnvs = new HashMap<String, String>();
            getLogWriter().writeDebug("JAVA_HOME = " + System.getenv("JAVA_HOME"));
            allEnvs.put("JAVA_HOME", System.getenv("JAVA_HOME"));
            allEnvs.put("FLINK_LOG_DIR", flinkLogPath);
            allEnvs.put("FLINK_CONF_DIR", tmpFlinkConfDir);
            allEnvs.put("HADOOP_CLASSPATH", alinkBaseJarFilePath + ":" + tmpFlinkConfDir);
            allEnvs.put("CUPID_DRY_RUN_MODE_ENABLE", "true");
            if (job.envs != null) {
                allEnvs.putAll(job.envs);
            }

            for (Map.Entry<String, String> env: allEnvs.entrySet()) {
                cmdBuilder.append(env.getKey() + "=" + env.getValue() + " ");
            }

            cmdBuilder.append("./bin/flink run ");
            cmdBuilder.append("-yt " + alinkBaseJarDir + " ");

            for (String param : job.cupidParams) {
                cmdBuilder.append(param + " ");
            }

            cmdBuilder.append("-d " + alinkAlgoJarFilePath + " ");

            for (String param : job.alinkParams) {
                cmdBuilder.append(param + " ");
            }

            getLogWriter().writeError("Flink console is running...");
            getLogWriter().writeDebug("Flink command:" + cmdBuilder.toString());

            String cmdFile = createTempCommandFile(cmdBuilder.toString());
            String result = ExecuteShell("/bin/bash " + cmdFile);
            Matcher m = cupidPattern.matcher(result);
            if (m.find()) {
                cupidTaskPlanContent += m.group("plan");
                cupidTaskPlanContent += ";";
            }
            getLogWriter().writeError("Flink console execute success.");
        }

        if (!cupidTaskPlanContent.isEmpty()) {
            cupidTaskPlanContent = cupidTaskPlanContent.substring(0, cupidTaskPlanContent.length() - 1);
        }

        getLogWriter().writeDebug("cupidTaskPlanContent:" + cupidTaskPlanContent);

        return cupidTaskPlanContent;
    }