in wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/api/Job.java [273:338]
protected void doExecute() {
// Make sure that each job is only executed once.
if (this.hasBeenExecuted.getAndSet(true)) {
throw new WayangException("Job has already been executed.");
}
try {
// Prepare the #wayangPlan for the optimization.
this.optimizationRound.start();
this.prepareWayangPlan();
// Estimate cardinalities and execution times for the #wayangPlan.
this.estimateKeyFigures();
// Get an execution plan.
int executionId = 0;
ExecutionPlan executionPlan = this.createInitialExecutionPlan();
this.optimizationRound.stop();
if (this.experiment != null) {
this.experiment.addMeasurement(ExecutionPlanMeasurement.capture(
executionPlan,
String.format("execution-plan-%d", executionId)
));
}
// TODO: generate run ID. For now we fix this because we can't handle multiple jobs, neither in montoring nor execution.
String runId = "1";
try {
monitor.initialize(this.configuration, runId, executionPlan.toJsonList());
} catch (Exception e) {
this.logger.warn("Failed to initialize monitor: {}", e);
}
// Take care of the execution.
while (!this.execute(executionPlan, executionId)) {
this.optimizationRound.start();
if (this.postProcess(executionPlan, executionId)) {
executionId++;
if (this.experiment != null) {
this.experiment.addMeasurement(ExecutionPlanMeasurement.capture(
executionPlan,
String.format("execution-plan-%d", executionId)
));
}
}
this.optimizationRound.stop();
}
this.stopWatch.start("Post-processing");
if (this.configuration.getBooleanProperty("wayang.core.log.enabled")) {
this.logExecution();
}
} catch (WayangException e) {
throw e;
} catch (Throwable t) {
throw new WayangException("Job execution failed.", t);
} finally {
this.stopWatch.stopAll();
this.stopWatch.start("Post-processing", "Release Resources");
this.releaseResources();
this.stopWatch.stop("Post-processing");
this.logger.info("StopWatch results:\n{}", this.stopWatch.toPrettyString());
}
}