in src/main/java/org/apache/sysds/resource/ResourceOptimizer.java [100:359]
public static Enumerator initEnumerator(CommandLine line, PropertiesConfiguration options) throws ParseException, IOException {
// parse script arguments
HashMap <String, String> argsMap = new HashMap<>();
if (line.hasOption("args")){
String[] argValues = line.getOptionValues("args");
for (int k=0; k<argValues.length; k++){
String str = argValues[k];
if (!str.isEmpty()) {
argsMap.put("$" + (k+1), str);
}
}
}
if (line.hasOption("nvargs")){
String varNameRegex = "^[a-zA-Z]([a-zA-Z0-9_])*$";
String[] nvargValues = line.getOptionValues("nvargs");
for (String str : nvargValues){
if (!str.isEmpty()){
String[] kv = str.split("=");
if (kv.length != 2){
throw new org.apache.commons.cli.ParseException("Invalid argument specified for -nvargs option, must be a list of space separated K=V pairs, where K is a valid name of a variable in the DML/PyDML program");
}
if (!kv[0].matches(varNameRegex)) {
throw new org.apache.commons.cli.ParseException("Invalid argument specified for -nvargs option, " + kv[0] + " does not seem like a valid variable name in DML. Valid variable names in DML start with upper-case or lower-case letter, and contain only letters, digits, or underscores");
}
argsMap.put("$" + kv[0], kv[1]);
}
}
}
// load the rest of the options variables
String regionOpt = getOrDefault(options, "REGION", "");
String infoTablePathOpt = getOrDefault(options, "INFO_TABLE", "");
String regionTablePathOpt = getOrDefault(options, "REGION_TABLE", "");
String localInputsOpt = getOrDefault(options, "LOCAL_INPUTS", "");
String enumerationOpt = getOrDefault(options, "ENUMERATION", "");
String optimizationOpt = getOrDefault(options, "OPTIMIZATION_FUNCTION", "");
String costsWeightOpt = getOrDefault(options, "COSTS_WEIGHT", "");
String maxTimeOpt = getOrDefault(options, "MAX_TIME", "");
String maxPriceOpt = getOrDefault(options, "MAX_PRICE", "");
String cpuQuotaOpt = getOrDefault(options, "CPU_QUOTA", "");
String minExecutorsOpt = getOrDefault(options, "MIN_EXECUTORS", "");
String maxExecutorsOpt = getOrDefault(options, "MAX_EXECUTORS", "");
String instanceFamiliesOpt = getOrDefault(options, "INSTANCE_FAMILIES", "");
String instanceSizesOpt = getOrDefault(options, "INSTANCE_SIZES", "");
String stepSizeOpt = getOrDefault(options, "STEP_SIZE", "");
String expBaseOpt = getOrDefault(options, "EXPONENTIAL_BASE", "");
String useLargestEstOpt = getOrDefault(options, "USE_LARGEST_ESTIMATE", "");
String useCpEstOpt = getOrDefault(options, "USE_CP_ESTIMATES", "");
String useBroadcastOpt = getOrDefault(options, "USE_BROADCASTS", "");
String useOutputsOpt = getOrDefault(options, "USE_OUTPUTS", "");
// replace declared S3 files with local path
HashMap<String, String> localInputMap = new HashMap<>();
if (!localInputsOpt.isEmpty()) {
String[] inputParts = localInputsOpt.split(",");
for (String var : inputParts){
String[] varParts = var.split("=");
if (varParts.length != 2) {
throw new RuntimeException("Invalid local variable pairs declaration: " + var);
}
if (!argsMap.containsValue(varParts[0])) {
throw new RuntimeException("Option for local input does not match any given argument: " + varParts[0]);
}
String argName = getKeyByValue(argsMap, varParts[0]);
// update variables for compilation
argsMap.put(argName, varParts[1]);
// fill a map for later replacement back after first compilation
localInputMap.put(varParts[1], varParts[0]);
}
}
// replace S3 filesystem identifier to match the available hadoop connector if needed
if (argsMap.values().stream().anyMatch(var -> var.startsWith("s3"))) {
String s3Filesystem = getAvailableHadoopS3Filesystem();
replaceS3Filesystem(argsMap, s3Filesystem);
}
// materialize the options
Enumerator.EnumerationStrategy strategy;
if (enumerationOpt.isEmpty()) {
strategy = Enumerator.EnumerationStrategy.GridBased; // default
} else {
switch (enumerationOpt) {
case "grid":
strategy = Enumerator.EnumerationStrategy.GridBased;
break;
case "interest":
strategy = Enumerator.EnumerationStrategy.InterestBased;
break;
case "prune":
strategy = Enumerator.EnumerationStrategy.PruneBased;
break;
default:
throw new ParseException("Unsupported identifier for enumeration strategy: " + line.getOptionValue("enum"));
}
}
Enumerator.OptimizationStrategy optimizedFor;
if (optimizationOpt.isEmpty()) {
optimizedFor = Enumerator.OptimizationStrategy.MinCosts;
} else {
switch (optimizationOpt) {
case "costs":
optimizedFor = Enumerator.OptimizationStrategy.MinCosts;
break;
case "time":
optimizedFor = Enumerator.OptimizationStrategy.MinTime;
break;
case "price":
optimizedFor = Enumerator.OptimizationStrategy.MinPrice;
break;
default:
throw new ParseException("Unsupported identifier for optimization strategy: " + line.getOptionValue("optimizeFor"));
}
}
if (optimizedFor == Enumerator.OptimizationStrategy.MinCosts && !costsWeightOpt.isEmpty()) {
double costsWeighFactor = Double.parseDouble(costsWeightOpt);
if (costsWeighFactor < 0.0 || costsWeighFactor > 1.0) {
throw new ParseException("The provided option 'price' for -enum requires additionally an option for -maxTime");
}
Enumerator.setCostsWeightFactor(costsWeighFactor);
} else if (!costsWeightOpt.isEmpty()) {
System.err.println("Warning: option MAX_PRICE is relevant only for OPTIMIZATION_FUNCTION 'time'");
}
if (optimizedFor == Enumerator.OptimizationStrategy.MinTime) {
if (maxPriceOpt.isEmpty()) {
throw new ParseException("Providing the option MAX_PRICE value is required " +
"when OPTIMIZATION_FUNCTION is set to 'time'");
}
double priceConstraint = Double.parseDouble(maxPriceOpt);
if (priceConstraint <= 0) {
throw new ParseException("Invalid value for option MIN_PRICE " +
"when option OPTIMIZATION_FUNCTION is set to 'time'");
}
Enumerator.setMinPrice(priceConstraint);
} else if (!maxPriceOpt.isEmpty()) {
System.err.println("Warning: option MAX_PRICE is relevant only for OPTIMIZATION_FUNCTION 'time'");
}
if (optimizedFor == Enumerator.OptimizationStrategy.MinPrice) {
if (maxTimeOpt.isEmpty()) {
throw new ParseException("Providing the option MAX_TIME value is required " +
"when OPTIMIZATION_FUNCTION is set to 'price'");
}
double timeConstraint = Double.parseDouble(maxTimeOpt);
if (timeConstraint <= 0) {
throw new ParseException("Missing or invalid value for option MIN_TIME " +
"when option OPTIMIZATION_FUNCTION is set to 'price'");
}
Enumerator.setMinTime(timeConstraint);
} else if (!maxTimeOpt.isEmpty()) {
System.err.println("Warning: option MAX_TIME is relevant only for OPTIMIZATION_FUNCTION 'price'");
}
if (!cpuQuotaOpt.isEmpty()) {
int quotaForNumCores = Integer.parseInt(cpuQuotaOpt);
if (quotaForNumCores < 32) {
throw new ParseException("CPU quota of under 32 number of cores is not allowed");
}
Enumerator.setCpuQuota(quotaForNumCores);
}
int minExecutors = minExecutorsOpt.isEmpty()? -1 : Integer.parseInt(minExecutorsOpt);
int maxExecutors = maxExecutorsOpt.isEmpty()? -1 : Integer.parseInt(maxExecutorsOpt);
String[] instanceFamilies = instanceFamiliesOpt.isEmpty()? null : instanceFamiliesOpt.split(",");
String[] instanceSizes = instanceSizesOpt.isEmpty()? null : instanceSizesOpt.split(",");
// parse arguments specific to enumeration strategies
int stepSize = 1;
int expBase = -1;
if (strategy == Enumerator.EnumerationStrategy.GridBased) {
if (!stepSizeOpt.isEmpty())
stepSize = Integer.parseInt(stepSizeOpt);
if (!expBaseOpt.isEmpty())
expBase = Integer.parseInt(expBaseOpt);
} else {
if (!stepSizeOpt.isEmpty())
System.err.println("Warning: option STEP_SIZE is relevant only for option ENUMERATION 'grid'");
if (line.hasOption("expBase"))
System.err.println("Warning: option EXPONENTIAL_BASE is relevant only for option ENUMERATION 'grid'");
}
boolean interestLargestEstimate = true;
boolean interestEstimatesInCP = true;
boolean interestBroadcastVars = true;
boolean interestOutputCaching = false;
if (strategy == Enumerator.EnumerationStrategy.InterestBased) {
if (!useLargestEstOpt.isEmpty())
interestLargestEstimate = Boolean.parseBoolean(useLargestEstOpt);
if (!useCpEstOpt.isEmpty())
interestEstimatesInCP = Boolean.parseBoolean(useCpEstOpt);
if (!useBroadcastOpt.isEmpty())
interestBroadcastVars = Boolean.parseBoolean(useBroadcastOpt);
if (!useOutputsOpt.isEmpty())
interestOutputCaching = Boolean.parseBoolean(useOutputsOpt);
} else {
if (!useLargestEstOpt.isEmpty())
System.err.println("Warning: option -useLargestEst is relevant only for -enum 'interest'");
if (!useCpEstOpt.isEmpty())
System.err.println("Warning: option -useCpEstimates is relevant only for -enum 'interest'");
if (!useBroadcastOpt.isEmpty())
System.err.println("Warning: option -useBroadcasts is relevant only for -enum 'interest'");
if (!useOutputsOpt.isEmpty())
System.err.println("Warning: option -useOutputs is relevant only for -enum 'interest'");
}
double[] regionalPrices = CloudUtils.loadRegionalPrices(regionTablePathOpt, regionOpt);
HashMap<String, CloudInstance> allInstances = CloudUtils.loadInstanceInfoTable(infoTablePathOpt, regionalPrices[0], regionalPrices[1]);
// step 2: compile the initial runtime program
Program sourceProgram = ResourceCompiler.compile(line.getOptionValue("f"), argsMap, localInputMap);
// step 3: initialize the enumerator
// set the mandatory setting
Enumerator.Builder builder = new Enumerator.Builder()
.withRuntimeProgram(sourceProgram)
.withAvailableInstances(allInstances)
.withEnumerationStrategy(strategy)
.withOptimizationStrategy(optimizedFor);
// set min and max number of executors
if (maxExecutors >= 0 && minExecutors > maxExecutors) {
throw new ParseException("Option for MAX_EXECUTORS should be always greater or equal the option for -minExecutors");
}
builder.withNumberExecutorsRange(minExecutors, maxExecutors);
// set range of instance types
try {
if (instanceFamilies != null)
builder.withInstanceFamilyRange(instanceFamilies);
} catch (IllegalArgumentException e) {
throw new ParseException("Not all provided options for INSTANCE_FAMILIES are supported or valid. Error thrown at:\n"+e.getMessage());
}
// set range of instance sizes
try {
if (instanceSizes != null)
builder.withInstanceSizeRange(instanceSizes);
} catch (IllegalArgumentException e) {
throw new ParseException("Not all provided options for INSTANCE_SIZES are supported or valid. Error thrown at:\n"+e.getMessage());
}
// set step size for grid-based enum.
if (strategy == Enumerator.EnumerationStrategy.GridBased && stepSize > 1) {
builder.withStepSizeExecutor(stepSize);
} else if (stepSize < 1) {
throw new ParseException("Invalid option for -stepSize");
}
// set exponential base for grid-based enum.
if (strategy == Enumerator.EnumerationStrategy.GridBased) {
builder.withExpBaseExecutors(expBase);
}
// set flags for interest-based enum.
if (strategy == Enumerator.EnumerationStrategy.InterestBased) {
builder.withInterestLargestEstimate(interestLargestEstimate)
.withInterestEstimatesInCP(interestEstimatesInCP)
.withInterestBroadcastVars(interestBroadcastVars)
.withInterestOutputCaching(interestOutputCaching);
}
// build the enumerator
return builder.build();
}