private void logExecution()

in wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/api/Job.java [665:797]


    private void logExecution() {
        this.stopWatch.start("Post-processing", "Log measurements");

        // For the last time, update the cardinalities and store them.
        this.reestimateCardinalities(this.crossPlatformExecutor);
        final CardinalityRepository cardinalityRepository = this.wayangContext.getCardinalityRepository();
        cardinalityRepository.storeAll(this.crossPlatformExecutor, this.optimizationContext);

        // Execution times.
        final Collection<PartialExecution> partialExecutions = this.crossPlatformExecutor.getPartialExecutions();

        // Add the execution times to the experiment.
        int nextPartialExecutionMeasurementId = 0;
        for (PartialExecution partialExecution : partialExecutions) {
            if (this.logger.isDebugEnabled()) {
                for (AtomicExecutionGroup atomicExecutionGroup : partialExecution.getAtomicExecutionGroups()) {
                    if (!(atomicExecutionGroup.getEstimationContext() instanceof OptimizationContext.OperatorContext)) {
                        continue;
                    }
                    OptimizationContext.OperatorContext operatorContext =
                            (OptimizationContext.OperatorContext) atomicExecutionGroup.getEstimationContext();

                    for (CardinalityEstimate cardinality : operatorContext.getInputCardinalities()) {
                        if (cardinality != null && !cardinality.isExact()) {
                            this.logger.debug(
                                    "Inexact input cardinality estimate {} for {}.",
                                    cardinality, operatorContext.getOperator()
                            );
                        }
                    }
                    for (CardinalityEstimate cardinality : operatorContext.getOutputCardinalities()) {
                        if (cardinality != null && !cardinality.isExact()) {
                            this.logger.debug(
                                    "Inexact output cardinality estimate {} for {}.",
                                    cardinality, operatorContext.getOperator()
                            );
                        }
                    }
                }
            }
            String id = String.format("par-ex-%03d", nextPartialExecutionMeasurementId++);
            final PartialExecutionMeasurement measurement = new PartialExecutionMeasurement(id, partialExecution, this.configuration);
            this.experiment.addMeasurement(measurement);
        }

        // Feed the execution log.
        try (ExecutionLog executionLog = ExecutionLog.open(this.configuration)) {
            executionLog.storeAll(partialExecutions);
        } catch (Exception e) {
            this.logger.error("Storing partial executions failed.", e);
        }
        this.optimizationRound.stop("Post-processing", "Log measurements");

        // Log the execution time.
        long effectiveExecutionMillis = partialExecutions.stream()
                .map(PartialExecution::getMeasuredExecutionTime)
                .reduce(0L, (a, b) -> a + b);
        long measuredExecutionMillis = this.executionRound.getMillis();
        this.logger.info(
                "Accumulated execution time: {} (effective: {}, overhead: {})",
                Formats.formatDuration(measuredExecutionMillis, true),
                Formats.formatDuration(effectiveExecutionMillis, true),
                Formats.formatDuration(measuredExecutionMillis - effectiveExecutionMillis, true)
        );
        int i = 1;
        for (TimeEstimate timeEstimate : this.timeEstimates) {
            this.logger.info("Estimated execution time (plan {}): {}", i, timeEstimate);
            TimeMeasurement lowerEstimate = new TimeMeasurement(String.format("Estimate %d (lower)", i));
            lowerEstimate.setMillis(timeEstimate.getLowerEstimate());
            this.stopWatch.getExperiment().addMeasurement(lowerEstimate);
            TimeMeasurement upperEstimate = new TimeMeasurement(String.format("Estimate %d (upper)", i));
            upperEstimate.setMillis(timeEstimate.getUpperEstimate());
            this.stopWatch.getExperiment().addMeasurement(upperEstimate);
            i++;
        }

        // Log the cost settings.
        final Collection<Platform> consideredPlatforms = this.configuration.getPlatformProvider().provideAll();
        for (Platform consideredPlatform : consideredPlatforms) {
            final TimeToCostConverter timeToCostConverter = this.configuration
                    .getTimeToCostConverterProvider()
                    .provideFor(consideredPlatform);
            this.experiment.getSubject().addConfiguration(
                    String.format("Costs per ms (%s)", consideredPlatform.getName()),
                    timeToCostConverter.getCostsPerMillisecond()
            );
            this.experiment.getSubject().addConfiguration(
                    String.format("Fix costs (%s)", consideredPlatform.getName()),
                    timeToCostConverter.getFixCosts()
            );
        }


        // Log the execution costs.
        double fixCosts = partialExecutions.stream()
                .flatMap(partialExecution -> partialExecution.getInvolvedPlatforms().stream())
                .map(platform -> this.configuration.getTimeToCostConverterProvider().provideFor(platform).getFixCosts())
                .reduce(0d, (a, b) -> a + b);
        double effectiveLowerCosts = fixCosts + partialExecutions.stream()
                .map(PartialExecution::getMeasuredLowerCost)
                .reduce(0d, (a, b) -> a + b);
        double effectiveUpperCosts = fixCosts + partialExecutions.stream()
                .map(PartialExecution::getMeasuredUpperCost)
                .reduce(0d, (a, b) -> a + b);
        this.logger.info("Accumulated costs: {} .. {}",
                String.format("%,.2f", effectiveLowerCosts),
                String.format("%,.2f", effectiveUpperCosts)
        );
        this.experiment.addMeasurement(
                new CostMeasurement("Measured cost", effectiveLowerCosts, effectiveUpperCosts, 1d)
        );
        i = 1;
        for (ProbabilisticDoubleInterval costEstimate : this.costEstimates) {
            this.logger.info("Estimated costs (plan {}): {}", i, costEstimate);
            this.experiment.addMeasurement(new CostMeasurement(
                    String.format("Estimated costs (%d)", i),
                    costEstimate.getLowerEstimate(),
                    costEstimate.getUpperEstimate(),
                    costEstimate.getCorrectnessProbability()
            ));
            i++;
        }

        // Log some plan metrics.
        final PlanMetrics planMetrics = PlanMetrics.createFor(this.wayangPlan, "Plan Metrics");
        this.logger.info("Plan metrics: {} virtual operators, {} execution operators, {} alternatives, {} combinations",
                planMetrics.getNumVirtualOperators(),
                planMetrics.getNumExecutionOperators(),
                planMetrics.getNumAlternatives(),
                planMetrics.getNumCombinations()
        );
        this.experiment.addMeasurement(planMetrics);
    }