in storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverConfig.java [71:168]
private void computeComponentConstraints() {
comps.forEach(k -> incompatibleComponentSets.computeIfAbsent(k, x -> new HashSet<>()));
Object rasConstraints = topoConf.get(Config.TOPOLOGY_RAS_CONSTRAINTS);
if (rasConstraints == null) {
LOG.warn("TopoId {}: No config supplied for {}", topoId, Config.TOPOLOGY_RAS_CONSTRAINTS);
} else if (rasConstraints instanceof List) {
// old style
List<List<String>> constraints = (List<List<String>>) rasConstraints;
for (List<String> constraintPair : constraints) {
String comp1 = constraintPair.get(0);
String comp2 = constraintPair.get(1);
if (!comps.contains(comp1)) {
LOG.warn("TopoId {}: Comp {} declared in constraints is not valid!", topoId, comp1);
continue;
}
if (!comps.contains(comp2)) {
LOG.warn("TopoId {}: Comp {} declared in constraints is not valid!", topoId, comp2);
continue;
}
incompatibleComponentSets.get(comp1).add(comp2);
incompatibleComponentSets.get(comp2).add(comp1);
}
} else {
Map<String, Map<String, ?>> constraintMap = (Map<String, Map<String, ?>>) rasConstraints;
constraintMap.forEach((comp1, v) -> {
if (comps.contains(comp1)) {
v.forEach((ctype, constraint) -> {
switch (ctype) {
case CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT:
try {
int numValue = Integer.parseInt("" + constraint);
if (numValue < 1) {
LOG.warn("TopoId {}: {} {} declared for Comp {} is not valid, expected >= 1",
topoId, ctype, numValue, comp1);
} else {
maxNodeCoLocationCnts.put(comp1, numValue);
}
} catch (Exception ex) {
LOG.warn("TopoId {}: {} {} declared for Comp {} for topoId {} is not valid, expected >= 1",
topoId, ctype, constraint, comp1);
}
break;
case CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS:
if (!(constraint instanceof List || constraint instanceof String)) {
LOG.warn("TopoId {}: {} {} declared for Comp {} is not valid, expecting a list of Comps or 1 Comp",
topoId, ctype, constraint, comp1);
break;
}
List<String> list;
list = (constraint instanceof String) ? Arrays.asList((String) constraint) : (List<String>) constraint;
for (String comp2: list) {
if (!comps.contains(comp2)) {
LOG.warn("TopoId {}: {} {} declared for Comp {} is not a valid Comp", topoId, ctype, comp2, comp1);
continue;
}
incompatibleComponentSets.get(comp1).add(comp2);
incompatibleComponentSets.get(comp2).add(comp1);
}
break;
default:
LOG.warn("TopoId {}: ConstraintType={} invalid for Comp={}, valid values are {} and {}, ignoring value={}",
topoId, ctype, comp1, CONSTRAINT_TYPE_MAX_NODE_CO_LOCATION_CNT,
CONSTRAINT_TYPE_INCOMPATIBLE_COMPONENTS, constraint);
break;
}
});
} else {
LOG.warn("TopoId {}: Component {} is not a valid component", topoId, comp1);
}
});
}
// process Config.TOPOLOGY_SPREAD_COMPONENTS - old style
// override only if not defined already using Config.TOPOLOGY_RAS_COMPONENTS above
Object obj = topoConf.get(Config.TOPOLOGY_SPREAD_COMPONENTS);
if (obj == null) {
return;
}
if (obj instanceof List) {
List<String> spread = (List<String>) obj;
for (String comp : spread) {
if (!comps.contains(comp)) {
LOG.warn("TopoId {}: Invalid Component {} declared in spread {}", topoId, comp, spread);
continue;
}
if (maxNodeCoLocationCnts.containsKey(comp)) {
LOG.warn("TopoId {}: Component {} maxNodeCoLocationCnt={} already defined in {}, ignoring spread config in {}", topoId,
comp, maxNodeCoLocationCnts.get(comp), Config.TOPOLOGY_RAS_CONSTRAINTS, Config.TOPOLOGY_SPREAD_COMPONENTS);
continue;
}
maxNodeCoLocationCnts.put(comp, 1);
}
} else {
LOG.warn("TopoId {}: Ignoring invalid {} config={}", topoId, Config.TOPOLOGY_SPREAD_COMPONENTS, obj);
}
}