public void generateFlinkJobs()

in odps-console-xflow/src/main/java/com/aliyun/openservices/odps/console/xflow/AlinkTransformer.java [77:133]


    public void generateFlinkJobs(
            ExecutionContext paiContext,
            CommandLine commandLine,
            String flinkConsoleRootDir,
            String alinkBaseJarPath,
            String alinkAlgoJarPath) throws Exception {
        String algoJobJson = constructAlgoJobJson(paiContext, commandLine);

        if (flinkConsoleRootDir == null) {
            throw new ODPSConsoleException("Flink console root dir " + flinkConsoleRootDir +  " not found.");
        }

        String flinkConsoleLibDir = flinkConsoleRootDir + "/lib/";
        List<File> files = (List<File>) FileUtils.listFiles(new File(flinkConsoleLibDir), null, true);
        files.add(0, new File(alinkBaseJarPath));
        files.add(0, new File(alinkAlgoJarPath));

        URL[] urls = new URL[files.size()];
        int i = 0;
        for (File file : files) {
            String filePath = "file://" + file.getAbsolutePath();
            urls[i++] = new URL(filePath).toURI().toURL();
            paiContext.getOutputWriter().writeDebug("Find jar file " + filePath + ".");
        }

        ByteArrayOutputStream out = new ByteArrayOutputStream();
        ByteArrayOutputStream err = new ByteArrayOutputStream();
        PrintStream origOut = System.out;
        PrintStream origErr = System.err;
        try {
            System.setOut(new PrintStream(out, true));
            System.setErr(new PrintStream(err, true));
            URLClassLoader child = new URLClassLoader(urls);
            Class classToLoad = Class.forName("com.alibaba.alink.executor.pai.PaiMain", true, child);
            Method method = classToLoad.getDeclaredMethod("getFlinkJobsJson", String.class);
            Object instance = classToLoad.getDeclaredConstructor().newInstance();
            Object result = method.invoke(instance, algoJobJson);
            Gson gson = new GsonBuilder().disableHtmlEscaping().create();
            flinkJobList = gson.fromJson((String)result, FlinkJobList.class);
            // print alink version
            method = classToLoad.getDeclaredMethod("getVersion");
            instance = classToLoad.getDeclaredConstructor().newInstance();
            result = method.invoke(instance);
            // resume standard stdout/stderr
            System.setOut(origOut);
            System.setErr(origErr);

            paiContext.getOutputWriter().writeDebug(out.toString());
            paiContext.getOutputWriter().writeDebug(err.toString());
            paiContext.getOutputWriter().writeError("Alink version:" + (String)result);
        } catch(Exception e) {
            System.setOut(origOut);
            System.setErr(origErr);
            e.printStackTrace();
            throw e;
        }
    }