private static void bootstrapLoadAndTimeEstimatorProviders()

in wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/api/Configuration.java [391:526]


    private static void bootstrapLoadAndTimeEstimatorProviders(Configuration configuration) {
        {
            // Safety net: provide a fallback selectivity.
            KeyValueProvider<ExecutionOperator, LoadProfileEstimator> fallbackProvider =
                    new FunctionalKeyValueProvider<ExecutionOperator, LoadProfileEstimator>(
                            (operator, requestee) -> {
                                final Configuration conf = requestee.getConfiguration();
                                return new NestableLoadProfileEstimator(
                                        IntervalLoadEstimator.createIOLinearEstimator(
                                                null,
                                                conf.getLongProperty("wayang.core.fallback.udf.cpu.lower"),
                                                conf.getLongProperty("wayang.core.fallback.udf.cpu.upper"),
                                                conf.getDoubleProperty("wayang.core.fallback.udf.cpu.confidence"),
                                                CardinalityEstimate.EMPTY_ESTIMATE
                                        ),
                                        IntervalLoadEstimator.createIOLinearEstimator(
                                                null,
                                                conf.getLongProperty("wayang.core.fallback.udf.ram.lower"),
                                                conf.getLongProperty("wayang.core.fallback.udf.ram.upper"),
                                                conf.getDoubleProperty("wayang.core.fallback.udf.ram.confidence"),
                                                CardinalityEstimate.EMPTY_ESTIMATE
                                        )
                                );
                            },
                            configuration
                    ).withSlf4jWarning("Creating fallback load estimator for {}.");

            // Built-in option: let the ExecutionOperators provide the LoadProfileEstimator.
            KeyValueProvider<ExecutionOperator, LoadProfileEstimator> builtInProvider =
                    new FunctionalKeyValueProvider<>(
                            fallbackProvider,
                            (operator, requestee) -> operator.createLoadProfileEstimator(requestee.getConfiguration()).orElse(null)
                    );

            // Customizable layer: Users can override manually.
            KeyValueProvider<ExecutionOperator, LoadProfileEstimator> overrideProvider =
                    new MapBasedKeyValueProvider<>(builtInProvider);

            configuration.setOperatorLoadProfileEstimatorProvider(overrideProvider);
        }
        {
            // Safety net: provide a fallback selectivity.
            KeyValueProvider<FunctionDescriptor, LoadProfileEstimator> fallbackProvider =
                    new FunctionalKeyValueProvider<FunctionDescriptor, LoadProfileEstimator>(
                            (operator, requestee) -> {
                                final Configuration conf = requestee.getConfiguration();
                                return new NestableLoadProfileEstimator(
                                        IntervalLoadEstimator.createIOLinearEstimator(
                                                null,
                                                conf.getLongProperty("wayang.core.fallback.operator.cpu.lower"),
                                                conf.getLongProperty("wayang.core.fallback.operator.cpu.upper"),
                                                conf.getDoubleProperty("wayang.core.fallback.operator.cpu.confidence"),
                                                CardinalityEstimate.EMPTY_ESTIMATE
                                        ),
                                        IntervalLoadEstimator.createIOLinearEstimator(
                                                null,
                                                conf.getLongProperty("wayang.core.fallback.operator.ram.lower"),
                                                conf.getLongProperty("wayang.core.fallback.operator.ram.upper"),
                                                conf.getDoubleProperty("wayang.core.fallback.operator.ram.confidence"),
                                                CardinalityEstimate.EMPTY_ESTIMATE
                                        )
                                );
                            },
                            configuration
                    ).withSlf4jWarning("Creating fallback load estimator for {}.");

            // Built-in layer: let the FunctionDescriptors provide the LoadProfileEstimators themselves.
            KeyValueProvider<FunctionDescriptor, LoadProfileEstimator> builtInProvider =
                    new FunctionalKeyValueProvider<>(
                            fallbackProvider,
                            functionDescriptor -> functionDescriptor.getLoadProfileEstimator().orElse(null)
                    );

            // Customizable layer: Users can override manually.
            KeyValueProvider<FunctionDescriptor, LoadProfileEstimator> overrideProvider =
                    new MapBasedKeyValueProvider<>(builtInProvider);

            configuration.setFunctionLoadProfileEstimatorProvider(overrideProvider);
        }
        {
            // Safety net: provide a fallback start up costs.
            final KeyValueProvider<Platform, Long> builtinProvider = new FunctionalKeyValueProvider<>(
                    (platform, requestee) -> platform.getInitializeMillis(requestee.getConfiguration()),
                    configuration
            );

            // Override layer.
            KeyValueProvider<Platform, Long> overrideProvider = new MapBasedKeyValueProvider<>(builtinProvider);
            configuration.setPlatformStartUpTimeProvider(overrideProvider);
        }
        {
            // Safety net: provide a fallback start up costs.
            final KeyValueProvider<Platform, LoadProfileToTimeConverter> fallbackProvider =
                    new FunctionalKeyValueProvider<Platform, LoadProfileToTimeConverter>(
                            platform -> LoadProfileToTimeConverter.createDefault(
                                    LoadToTimeConverter.createLinearCoverter(0.0000005), // 1 CPU with 2 GHz
                                    LoadToTimeConverter.createLinearCoverter(0.00001), // 10 ms to read/write 1 MB
                                    LoadToTimeConverter.createLinearCoverter(0.00001),  // 10 ms to receive/send 1 MB
                                    (cpuEstimate, diskEstimate, networkEstimate) -> cpuEstimate.plus(diskEstimate).plus(networkEstimate)
                            ),
                            configuration
                    )
                            .withSlf4jWarning("Using fallback load-to-time converter for {}.");
            final KeyValueProvider<Platform, LoadProfileToTimeConverter> defaultProvider =
                    new FunctionalKeyValueProvider<>(
                            fallbackProvider,
                            (platform, requestee) -> platform.createLoadProfileToTimeConverter(
                                    requestee.getConfiguration()
                            )
                    );
            final KeyValueProvider<Platform, LoadProfileToTimeConverter> overrideProvider =
                    new MapBasedKeyValueProvider<>(defaultProvider, false);
            configuration.setLoadProfileToTimeConverterProvider(overrideProvider);
        }
        {
            // Safety net: provide a fallback start up costs.
            final KeyValueProvider<Platform, TimeToCostConverter> fallbackProvider =
                    new FunctionalKeyValueProvider<Platform, TimeToCostConverter>(
                            platform -> new TimeToCostConverter(0d, 1d),
                            configuration
                    ).withSlf4jWarning("Using fallback time-to-cost converter for {}.");
            final KeyValueProvider<Platform, TimeToCostConverter> builtInProvider =
                    new FunctionalKeyValueProvider<>(
                            fallbackProvider,
                            (platform, requestee) -> platform.createTimeToCostConverter(
                                    requestee.getConfiguration()
                            )
                    );
            final KeyValueProvider<Platform, TimeToCostConverter> overrideProvider =
                    new MapBasedKeyValueProvider<>(builtInProvider, false);
            configuration.setTimeToCostConverterProvider(overrideProvider);
        }
        {
            configuration.setLoadProfileEstimatorCache(new MapBasedKeyValueProvider<>(configuration, true));
        }
    }