in compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/XGBoostPass.java [67:117]
public IRDAG apply(final IRDAG dag) {
try {
final String message = XGBoostPass.takeMessage();
LOG.info("Received message from the client: {}", message);
if (message.isEmpty()) {
LOG.info("No optimization included in the message. Returning the original dag.");
return dag;
} else {
ObjectMapper mapper = new ObjectMapper();
List<Map<String, String>> listOfMap =
mapper.readValue(message, new TypeReference<List<Map<String, String>>>() {
});
// Formatted into 9 digits: 0:vertex/edge 1-5:ID 5-9:EP Index.
listOfMap.stream().filter(m -> m.get("feature").length() == 9).forEach(m -> {
final Pair<String, Integer> idAndEPKey = OptimizerUtils.stringToIdAndEPKeyIndex(m.get("feature"));
LOG.info("Tuning: {} of {} should be {} than {}",
idAndEPKey.right(), idAndEPKey.left(), m.get("val"), m.get("split"));
final ExecutionProperty<? extends Serializable> newEP = MetricUtils.keyAndValueToEP(idAndEPKey.right(),
Double.valueOf(m.get("split")), Double.valueOf(m.get("val")));
try {
if (idAndEPKey.left().startsWith("vertex")) {
final IRVertex v = dag.getVertexById(idAndEPKey.left());
final VertexExecutionProperty<?> originalEP = v.getExecutionProperties().stream()
.filter(ep -> ep.getClass().isAssignableFrom(newEP.getClass())).findFirst().orElse(null);
v.setProperty((VertexExecutionProperty) newEP);
if (!dag.checkIntegrity().isPassed()) {
v.setProperty(originalEP);
}
} else if (idAndEPKey.left().startsWith("edge")) {
final IREdge e = dag.getEdgeById(idAndEPKey.left());
final EdgeExecutionProperty<?> originalEP = e.getExecutionProperties().stream()
.filter(ep -> ep.getClass().isAssignableFrom(newEP.getClass())).findFirst().orElse(null);
e.setProperty((EdgeExecutionProperty) newEP);
if (!dag.checkIntegrity().isPassed()) {
e.setProperty(originalEP);
}
}
} catch (IllegalVertexOperationException | IllegalEdgeOperationException e) {
}
});
}
} catch (final InvalidParameterException e) {
LOG.warn(e.getMessage());
return dag;
} catch (final Exception e) {
throw new CompileTimeOptimizationException(e);
}
return dag;
}