in wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/api/Job.java [665:797]
private void logExecution() {
this.stopWatch.start("Post-processing", "Log measurements");
// For the last time, update the cardinalities and store them.
this.reestimateCardinalities(this.crossPlatformExecutor);
final CardinalityRepository cardinalityRepository = this.wayangContext.getCardinalityRepository();
cardinalityRepository.storeAll(this.crossPlatformExecutor, this.optimizationContext);
// Execution times.
final Collection<PartialExecution> partialExecutions = this.crossPlatformExecutor.getPartialExecutions();
// Add the execution times to the experiment.
int nextPartialExecutionMeasurementId = 0;
for (PartialExecution partialExecution : partialExecutions) {
if (this.logger.isDebugEnabled()) {
for (AtomicExecutionGroup atomicExecutionGroup : partialExecution.getAtomicExecutionGroups()) {
if (!(atomicExecutionGroup.getEstimationContext() instanceof OptimizationContext.OperatorContext)) {
continue;
}
OptimizationContext.OperatorContext operatorContext =
(OptimizationContext.OperatorContext) atomicExecutionGroup.getEstimationContext();
for (CardinalityEstimate cardinality : operatorContext.getInputCardinalities()) {
if (cardinality != null && !cardinality.isExact()) {
this.logger.debug(
"Inexact input cardinality estimate {} for {}.",
cardinality, operatorContext.getOperator()
);
}
}
for (CardinalityEstimate cardinality : operatorContext.getOutputCardinalities()) {
if (cardinality != null && !cardinality.isExact()) {
this.logger.debug(
"Inexact output cardinality estimate {} for {}.",
cardinality, operatorContext.getOperator()
);
}
}
}
}
String id = String.format("par-ex-%03d", nextPartialExecutionMeasurementId++);
final PartialExecutionMeasurement measurement = new PartialExecutionMeasurement(id, partialExecution, this.configuration);
this.experiment.addMeasurement(measurement);
}
// Feed the execution log.
try (ExecutionLog executionLog = ExecutionLog.open(this.configuration)) {
executionLog.storeAll(partialExecutions);
} catch (Exception e) {
this.logger.error("Storing partial executions failed.", e);
}
this.optimizationRound.stop("Post-processing", "Log measurements");
// Log the execution time.
long effectiveExecutionMillis = partialExecutions.stream()
.map(PartialExecution::getMeasuredExecutionTime)
.reduce(0L, (a, b) -> a + b);
long measuredExecutionMillis = this.executionRound.getMillis();
this.logger.info(
"Accumulated execution time: {} (effective: {}, overhead: {})",
Formats.formatDuration(measuredExecutionMillis, true),
Formats.formatDuration(effectiveExecutionMillis, true),
Formats.formatDuration(measuredExecutionMillis - effectiveExecutionMillis, true)
);
int i = 1;
for (TimeEstimate timeEstimate : this.timeEstimates) {
this.logger.info("Estimated execution time (plan {}): {}", i, timeEstimate);
TimeMeasurement lowerEstimate = new TimeMeasurement(String.format("Estimate %d (lower)", i));
lowerEstimate.setMillis(timeEstimate.getLowerEstimate());
this.stopWatch.getExperiment().addMeasurement(lowerEstimate);
TimeMeasurement upperEstimate = new TimeMeasurement(String.format("Estimate %d (upper)", i));
upperEstimate.setMillis(timeEstimate.getUpperEstimate());
this.stopWatch.getExperiment().addMeasurement(upperEstimate);
i++;
}
// Log the cost settings.
final Collection<Platform> consideredPlatforms = this.configuration.getPlatformProvider().provideAll();
for (Platform consideredPlatform : consideredPlatforms) {
final TimeToCostConverter timeToCostConverter = this.configuration
.getTimeToCostConverterProvider()
.provideFor(consideredPlatform);
this.experiment.getSubject().addConfiguration(
String.format("Costs per ms (%s)", consideredPlatform.getName()),
timeToCostConverter.getCostsPerMillisecond()
);
this.experiment.getSubject().addConfiguration(
String.format("Fix costs (%s)", consideredPlatform.getName()),
timeToCostConverter.getFixCosts()
);
}
// Log the execution costs.
double fixCosts = partialExecutions.stream()
.flatMap(partialExecution -> partialExecution.getInvolvedPlatforms().stream())
.map(platform -> this.configuration.getTimeToCostConverterProvider().provideFor(platform).getFixCosts())
.reduce(0d, (a, b) -> a + b);
double effectiveLowerCosts = fixCosts + partialExecutions.stream()
.map(PartialExecution::getMeasuredLowerCost)
.reduce(0d, (a, b) -> a + b);
double effectiveUpperCosts = fixCosts + partialExecutions.stream()
.map(PartialExecution::getMeasuredUpperCost)
.reduce(0d, (a, b) -> a + b);
this.logger.info("Accumulated costs: {} .. {}",
String.format("%,.2f", effectiveLowerCosts),
String.format("%,.2f", effectiveUpperCosts)
);
this.experiment.addMeasurement(
new CostMeasurement("Measured cost", effectiveLowerCosts, effectiveUpperCosts, 1d)
);
i = 1;
for (ProbabilisticDoubleInterval costEstimate : this.costEstimates) {
this.logger.info("Estimated costs (plan {}): {}", i, costEstimate);
this.experiment.addMeasurement(new CostMeasurement(
String.format("Estimated costs (%d)", i),
costEstimate.getLowerEstimate(),
costEstimate.getUpperEstimate(),
costEstimate.getCorrectnessProbability()
));
i++;
}
// Log some plan metrics.
final PlanMetrics planMetrics = PlanMetrics.createFor(this.wayangPlan, "Plan Metrics");
this.logger.info("Plan metrics: {} virtual operators, {} execution operators, {} alternatives, {} combinations",
planMetrics.getNumVirtualOperators(),
planMetrics.getNumExecutionOperators(),
planMetrics.getNumAlternatives(),
planMetrics.getNumCombinations()
);
this.experiment.addMeasurement(planMetrics);
}