in modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverStoreV1.java [65:105]
public void update(CuratorFramework curator, FluoConfiguration config) throws Exception {
Collection<org.apache.fluo.api.config.ObserverSpecification> obsSpecs =
config.getObserverSpecifications();
Map<Column, org.apache.fluo.api.config.ObserverSpecification> colObservers = new HashMap<>();
Map<Column, org.apache.fluo.api.config.ObserverSpecification> weakObservers = new HashMap<>();
for (org.apache.fluo.api.config.ObserverSpecification ospec : obsSpecs) {
Observer observer;
try {
observer = Class.forName(ospec.getClassName()).asSubclass(Observer.class).newInstance();
} catch (ClassNotFoundException e1) {
throw new FluoException("Observer class '" + ospec.getClassName() + "' was not "
+ "found. Check for class name misspellings or failure to include "
+ "the observer jar.", e1);
} catch (InstantiationException | IllegalAccessException e2) {
throw new FluoException(
"Observer class '" + ospec.getClassName() + "' could not be created.", e2);
}
SimpleConfiguration oc = ospec.getConfiguration();
logger.info("Setting up observer {} using params {}.", observer.getClass().getSimpleName(),
oc.toMap());
try {
observer.init(new ObserverContext(config.getAppConfiguration(), oc));
} catch (Exception e) {
throw new FluoException("Observer '" + ospec.getClassName() + "' could not be initialized",
e);
}
org.apache.fluo.api.observer.Observer.ObservedColumn observedCol =
observer.getObservedColumn();
if (observedCol.getType() == NotificationType.STRONG) {
colObservers.put(observedCol.getColumn(), ospec);
} else {
weakObservers.put(observedCol.getColumn(), ospec);
}
}
updateObservers(curator, colObservers, weakObservers);
}