in flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/Benchmark.java [74:127]
private static void executeBenchmarks(CommandLine commandLine) throws Exception {
String configFile = commandLine.getArgs()[0];
Map<String, Map<String, Map<String, ?>>> benchmarks =
BenchmarkUtils.parseJsonFile(configFile);
System.out.printf("Found %d benchmarks.\n", benchmarks.keySet().size());
String saveFile = commandLine.getOptionValue(OUTPUT_FILE_OPTION.getLongOpt());
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
env.getConfig().disableGenericTypes();
env.setRestartStrategy(RestartStrategies.noRestart());
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
int index = 0;
for (Map.Entry<String, Map<String, Map<String, ?>>> entry : benchmarks.entrySet()) {
String benchmarkName = entry.getKey();
Map<String, Map<String, ?>> benchmarkMap = entry.getValue();
LOG.info(
String.format(
"Running benchmark %d/%d: %s",
index++, benchmarks.keySet().size(), benchmarkName));
try {
BenchmarkResult result =
BenchmarkUtils.runBenchmark(tEnv, benchmarkName, benchmarkMap, false);
benchmarkMap.put("results", result.toMap());
LOG.info(String.format("Benchmark %s finished.\n%s", benchmarkName, benchmarkMap));
} catch (Exception e) {
benchmarkMap.put(
"results",
Collections.singletonMap(
"exception",
String.format(
"%s(%s:%s)",
e,
e.getStackTrace()[0].getFileName(),
e.getStackTrace()[0].getLineNumber())));
LOG.error(String.format("Benchmark %s failed.\n%s", benchmarkName, e));
}
}
System.out.println("Benchmarks execution completed.");
String benchmarkResultsJson =
ReadWriteUtils.OBJECT_MAPPER
.writerWithDefaultPrettyPrinter()
.writeValueAsString(benchmarks);
if (commandLine.hasOption(OUTPUT_FILE_OPTION.getLongOpt())) {
FileUtils.saveToFile(saveFile, benchmarkResultsJson, true);
System.out.printf("Benchmark results saved as json in %s.\n", saveFile);
} else {
System.out.printf("Benchmark results summary:\n%s\n", benchmarkResultsJson);
}
}