private static void executeBenchmarks()

in flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/Benchmark.java [74:127]


    private static void executeBenchmarks(CommandLine commandLine) throws Exception {
        String configFile = commandLine.getArgs()[0];
        Map<String, Map<String, Map<String, ?>>> benchmarks =
                BenchmarkUtils.parseJsonFile(configFile);
        System.out.printf("Found %d benchmarks.\n", benchmarks.keySet().size());
        String saveFile = commandLine.getOptionValue(OUTPUT_FILE_OPTION.getLongOpt());

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().enableObjectReuse();
        env.getConfig().disableGenericTypes();
        env.setRestartStrategy(RestartStrategies.noRestart());
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        int index = 0;
        for (Map.Entry<String, Map<String, Map<String, ?>>> entry : benchmarks.entrySet()) {
            String benchmarkName = entry.getKey();
            Map<String, Map<String, ?>> benchmarkMap = entry.getValue();

            LOG.info(
                    String.format(
                            "Running benchmark %d/%d: %s",
                            index++, benchmarks.keySet().size(), benchmarkName));

            try {
                BenchmarkResult result =
                        BenchmarkUtils.runBenchmark(tEnv, benchmarkName, benchmarkMap, false);
                benchmarkMap.put("results", result.toMap());
                LOG.info(String.format("Benchmark %s finished.\n%s", benchmarkName, benchmarkMap));
            } catch (Exception e) {
                benchmarkMap.put(
                        "results",
                        Collections.singletonMap(
                                "exception",
                                String.format(
                                        "%s(%s:%s)",
                                        e,
                                        e.getStackTrace()[0].getFileName(),
                                        e.getStackTrace()[0].getLineNumber())));
                LOG.error(String.format("Benchmark %s failed.\n%s", benchmarkName, e));
            }
        }
        System.out.println("Benchmarks execution completed.");

        String benchmarkResultsJson =
                ReadWriteUtils.OBJECT_MAPPER
                        .writerWithDefaultPrettyPrinter()
                        .writeValueAsString(benchmarks);
        if (commandLine.hasOption(OUTPUT_FILE_OPTION.getLongOpt())) {
            FileUtils.saveToFile(saveFile, benchmarkResultsJson, true);
            System.out.printf("Benchmark results saved as json in %s.\n", saveFile);
        } else {
            System.out.printf("Benchmark results summary:\n%s\n", benchmarkResultsJson);
        }
    }