in storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java [606:685]
public void updateStorm(String stormId, StormBase newElems) {
StormBase stormBase = stormBase(stormId, null);
if (stormBase.get_component_executors() != null) {
Map<String, Integer> newComponentExecutors = new HashMap<>();
Map<String, Integer> componentExecutors = newElems.get_component_executors();
// componentExecutors maybe be APersistentMap, which don't support "put"
for (Map.Entry<String, Integer> entry : componentExecutors.entrySet()) {
newComponentExecutors.put(entry.getKey(), entry.getValue());
}
for (Map.Entry<String, Integer> entry : stormBase.get_component_executors().entrySet()) {
if (!componentExecutors.containsKey(entry.getKey())) {
newComponentExecutors.put(entry.getKey(), entry.getValue());
}
}
if (newComponentExecutors.size() > 0) {
newElems.set_component_executors(newComponentExecutors);
}
}
Map<String, DebugOptions> componentDebug = new HashMap<>();
Map<String, DebugOptions> oldComponentDebug = stormBase.get_component_debug();
Map<String, DebugOptions> newComponentDebug = newElems.get_component_debug();
/// oldComponentDebug.keySet()/ newComponentDebug.keySet() maybe be APersistentSet, which don't support addAll
Set<String> debugOptionsKeys = new HashSet<>();
debugOptionsKeys.addAll(oldComponentDebug.keySet());
debugOptionsKeys.addAll(newComponentDebug.keySet());
for (String key : debugOptionsKeys) {
boolean enable = false;
double samplingpct = 0;
if (oldComponentDebug.containsKey(key)) {
enable = oldComponentDebug.get(key).is_enable();
samplingpct = oldComponentDebug.get(key).get_samplingpct();
}
if (newComponentDebug.containsKey(key)) {
enable = newComponentDebug.get(key).is_enable();
samplingpct += newComponentDebug.get(key).get_samplingpct();
}
DebugOptions debugOptions = new DebugOptions();
debugOptions.set_enable(enable);
debugOptions.set_samplingpct(samplingpct);
componentDebug.put(key, debugOptions);
}
if (componentDebug.size() > 0) {
newElems.set_component_debug(componentDebug);
}
if (StringUtils.isBlank(newElems.get_name())) {
newElems.set_name(stormBase.get_name());
}
if (StringUtils.isBlank(newElems.get_topology_version()) && stormBase.is_set_topology_version()) {
newElems.set_topology_version(stormBase.get_topology_version());
}
if (newElems.get_status() == null) {
newElems.set_status(stormBase.get_status());
}
if (newElems.get_num_workers() == 0) {
newElems.set_num_workers(stormBase.get_num_workers());
}
if (newElems.get_launch_time_secs() == 0) {
newElems.set_launch_time_secs(stormBase.get_launch_time_secs());
}
if (StringUtils.isBlank(newElems.get_owner())) {
newElems.set_owner(stormBase.get_owner());
}
if (StringUtils.isBlank(newElems.get_principal()) && stormBase.is_set_principal()) {
newElems.set_principal(stormBase.get_principal());
}
if (newElems.get_topology_action_options() == null) {
newElems.set_topology_action_options(stormBase.get_topology_action_options());
}
if (newElems.get_status() == null) {
newElems.set_status(stormBase.get_status());
}
stateStorage.set_data(ClusterUtils.stormPath(stormId), Utils.serialize(newElems), defaultAcls);
}