in runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/MetricUtils.java [334:368]
static Integer valueToIndex(final Integer epKeyIndex, final ExecutionProperty<?> ep) {
final Object o = ep.getValue();
if (o instanceof Enum) {
return ((Enum) o).ordinal();
} else if (o instanceof Integer) {
return (int) o;
} else if (o instanceof Boolean) {
return ((Boolean) o) ? 1 : 0;
} else {
final ExecutionProperty<? extends Serializable> ep1;
if (o instanceof EncoderFactory || o instanceof DecoderFactory) {
ep1 = EP_METADATA.values().stream()
.filter(ep2 -> ep2.getValue().toString().equals(o.toString()) || ep2.getValue().equals(o))
.findFirst().orElse(null);
} else {
ep1 = EP_METADATA.values().stream()
.filter(ep2 -> ep2.getValue().equals(o))
.findFirst().orElse(null);
}
if (ep1 != null) {
return EP_METADATA.inverse().get(ep1).right();
} else {
final Integer valueIndex = EP_METADATA.keySet().stream()
.filter(pair -> pair.left().equals(epKeyIndex))
.mapToInt(Pair::right).max().orElse(0) + 1;
// Update the metadata if new EP value has been discovered.
EP_METADATA.put(Pair.of(epKeyIndex, valueIndex), ep);
LOG.info("New EP Index: ({}, {}) for {}", epKeyIndex, valueIndex, ep);
MUST_UPDATE_EP_METADATA.countDown();
return valueIndex;
}
}
}