protected void doExecute()

in wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/api/Job.java [273:338]


    protected void doExecute() {
        // Make sure that each job is only executed once.
        if (this.hasBeenExecuted.getAndSet(true)) {
            throw new WayangException("Job has already been executed.");
        }

        try {

            // Prepare the #wayangPlan for the optimization.
            this.optimizationRound.start();
            this.prepareWayangPlan();

            // Estimate cardinalities and execution times for the #wayangPlan.
            this.estimateKeyFigures();

            // Get an execution plan.
            int executionId = 0;
            ExecutionPlan executionPlan = this.createInitialExecutionPlan();
            this.optimizationRound.stop();
            if (this.experiment != null) {
                this.experiment.addMeasurement(ExecutionPlanMeasurement.capture(
                        executionPlan,
                        String.format("execution-plan-%d", executionId)
                ));
            }

            // TODO: generate run ID. For now we fix this because we can't handle multiple jobs, neither in montoring nor execution.
            String runId = "1";
            try {
                monitor.initialize(this.configuration, runId, executionPlan.toJsonList());
            } catch (Exception e) {
                this.logger.warn("Failed to initialize monitor: {}", e);
            }


            // Take care of the execution.
            while (!this.execute(executionPlan, executionId)) {
                this.optimizationRound.start();
                if (this.postProcess(executionPlan, executionId)) {
                    executionId++;
                    if (this.experiment != null) {
                        this.experiment.addMeasurement(ExecutionPlanMeasurement.capture(
                                executionPlan,
                                String.format("execution-plan-%d", executionId)
                        ));
                    }
                }
                this.optimizationRound.stop();
            }

            this.stopWatch.start("Post-processing");
            if (this.configuration.getBooleanProperty("wayang.core.log.enabled")) {
                this.logExecution();
            }
        } catch (WayangException e) {
            throw e;
        } catch (Throwable t) {
            throw new WayangException("Job execution failed.", t);
        } finally {
            this.stopWatch.stopAll();
            this.stopWatch.start("Post-processing", "Release Resources");
            this.releaseResources();
            this.stopWatch.stop("Post-processing");
            this.logger.info("StopWatch results:\n{}", this.stopWatch.toPrettyString());
        }
    }