in wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/api/Configuration.java [391:526]
private static void bootstrapLoadAndTimeEstimatorProviders(Configuration configuration) {
{
// Safety net: provide a fallback selectivity.
KeyValueProvider<ExecutionOperator, LoadProfileEstimator> fallbackProvider =
new FunctionalKeyValueProvider<ExecutionOperator, LoadProfileEstimator>(
(operator, requestee) -> {
final Configuration conf = requestee.getConfiguration();
return new NestableLoadProfileEstimator(
IntervalLoadEstimator.createIOLinearEstimator(
null,
conf.getLongProperty("wayang.core.fallback.udf.cpu.lower"),
conf.getLongProperty("wayang.core.fallback.udf.cpu.upper"),
conf.getDoubleProperty("wayang.core.fallback.udf.cpu.confidence"),
CardinalityEstimate.EMPTY_ESTIMATE
),
IntervalLoadEstimator.createIOLinearEstimator(
null,
conf.getLongProperty("wayang.core.fallback.udf.ram.lower"),
conf.getLongProperty("wayang.core.fallback.udf.ram.upper"),
conf.getDoubleProperty("wayang.core.fallback.udf.ram.confidence"),
CardinalityEstimate.EMPTY_ESTIMATE
)
);
},
configuration
).withSlf4jWarning("Creating fallback load estimator for {}.");
// Built-in option: let the ExecutionOperators provide the LoadProfileEstimator.
KeyValueProvider<ExecutionOperator, LoadProfileEstimator> builtInProvider =
new FunctionalKeyValueProvider<>(
fallbackProvider,
(operator, requestee) -> operator.createLoadProfileEstimator(requestee.getConfiguration()).orElse(null)
);
// Customizable layer: Users can override manually.
KeyValueProvider<ExecutionOperator, LoadProfileEstimator> overrideProvider =
new MapBasedKeyValueProvider<>(builtInProvider);
configuration.setOperatorLoadProfileEstimatorProvider(overrideProvider);
}
{
// Safety net: provide a fallback selectivity.
KeyValueProvider<FunctionDescriptor, LoadProfileEstimator> fallbackProvider =
new FunctionalKeyValueProvider<FunctionDescriptor, LoadProfileEstimator>(
(operator, requestee) -> {
final Configuration conf = requestee.getConfiguration();
return new NestableLoadProfileEstimator(
IntervalLoadEstimator.createIOLinearEstimator(
null,
conf.getLongProperty("wayang.core.fallback.operator.cpu.lower"),
conf.getLongProperty("wayang.core.fallback.operator.cpu.upper"),
conf.getDoubleProperty("wayang.core.fallback.operator.cpu.confidence"),
CardinalityEstimate.EMPTY_ESTIMATE
),
IntervalLoadEstimator.createIOLinearEstimator(
null,
conf.getLongProperty("wayang.core.fallback.operator.ram.lower"),
conf.getLongProperty("wayang.core.fallback.operator.ram.upper"),
conf.getDoubleProperty("wayang.core.fallback.operator.ram.confidence"),
CardinalityEstimate.EMPTY_ESTIMATE
)
);
},
configuration
).withSlf4jWarning("Creating fallback load estimator for {}.");
// Built-in layer: let the FunctionDescriptors provide the LoadProfileEstimators themselves.
KeyValueProvider<FunctionDescriptor, LoadProfileEstimator> builtInProvider =
new FunctionalKeyValueProvider<>(
fallbackProvider,
functionDescriptor -> functionDescriptor.getLoadProfileEstimator().orElse(null)
);
// Customizable layer: Users can override manually.
KeyValueProvider<FunctionDescriptor, LoadProfileEstimator> overrideProvider =
new MapBasedKeyValueProvider<>(builtInProvider);
configuration.setFunctionLoadProfileEstimatorProvider(overrideProvider);
}
{
// Safety net: provide a fallback start up costs.
final KeyValueProvider<Platform, Long> builtinProvider = new FunctionalKeyValueProvider<>(
(platform, requestee) -> platform.getInitializeMillis(requestee.getConfiguration()),
configuration
);
// Override layer.
KeyValueProvider<Platform, Long> overrideProvider = new MapBasedKeyValueProvider<>(builtinProvider);
configuration.setPlatformStartUpTimeProvider(overrideProvider);
}
{
// Safety net: provide a fallback start up costs.
final KeyValueProvider<Platform, LoadProfileToTimeConverter> fallbackProvider =
new FunctionalKeyValueProvider<Platform, LoadProfileToTimeConverter>(
platform -> LoadProfileToTimeConverter.createDefault(
LoadToTimeConverter.createLinearCoverter(0.0000005), // 1 CPU with 2 GHz
LoadToTimeConverter.createLinearCoverter(0.00001), // 10 ms to read/write 1 MB
LoadToTimeConverter.createLinearCoverter(0.00001), // 10 ms to receive/send 1 MB
(cpuEstimate, diskEstimate, networkEstimate) -> cpuEstimate.plus(diskEstimate).plus(networkEstimate)
),
configuration
)
.withSlf4jWarning("Using fallback load-to-time converter for {}.");
final KeyValueProvider<Platform, LoadProfileToTimeConverter> defaultProvider =
new FunctionalKeyValueProvider<>(
fallbackProvider,
(platform, requestee) -> platform.createLoadProfileToTimeConverter(
requestee.getConfiguration()
)
);
final KeyValueProvider<Platform, LoadProfileToTimeConverter> overrideProvider =
new MapBasedKeyValueProvider<>(defaultProvider, false);
configuration.setLoadProfileToTimeConverterProvider(overrideProvider);
}
{
// Safety net: provide a fallback start up costs.
final KeyValueProvider<Platform, TimeToCostConverter> fallbackProvider =
new FunctionalKeyValueProvider<Platform, TimeToCostConverter>(
platform -> new TimeToCostConverter(0d, 1d),
configuration
).withSlf4jWarning("Using fallback time-to-cost converter for {}.");
final KeyValueProvider<Platform, TimeToCostConverter> builtInProvider =
new FunctionalKeyValueProvider<>(
fallbackProvider,
(platform, requestee) -> platform.createTimeToCostConverter(
requestee.getConfiguration()
)
);
final KeyValueProvider<Platform, TimeToCostConverter> overrideProvider =
new MapBasedKeyValueProvider<>(builtInProvider, false);
configuration.setTimeToCostConverterProvider(overrideProvider);
}
{
configuration.setLoadProfileEstimatorCache(new MapBasedKeyValueProvider<>(configuration, true));
}
}