in modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverStoreV2.java [98:146]
public RegisteredObservers load(CuratorFramework curator) throws Exception {
byte[] data;
try {
data = curator.getData().forPath(CONFIG_FLUO_OBSERVERS2);
} catch (NoNodeException nne) {
return null;
}
String json = new String(data, UTF_8);
JsonObservers jco = new Gson().fromJson(json, JsonObservers.class);
ImmutableSet.Builder<Column> weakColumnsBuilder = new ImmutableSet.Builder<>();
ImmutableSet.Builder<Column> strongColumnsBuilder = new ImmutableSet.Builder<>();
for (Entry<Column, NotificationType> entry : jco.getObservedColumns().entrySet()) {
switch (entry.getValue()) {
case STRONG:
strongColumnsBuilder.add(entry.getKey());
break;
case WEAK:
weakColumnsBuilder.add(entry.getKey());
break;
default:
throw new IllegalStateException("Unknown notification type " + entry.getValue());
}
}
strongColumns = strongColumnsBuilder.build();
weakColumns = weakColumnsBuilder.build();
return new RegisteredObservers() {
@Override
public Observers getObservers(Environment env) {
return new ObserversV2(env, jco, strongColumns, weakColumns);
}
@Override
public Set<Column> getObservedColumns(NotificationType nt) {
switch (nt) {
case STRONG:
return strongColumns;
case WEAK:
return weakColumns;
default:
throw new IllegalArgumentException("Unknown notification type " + nt);
}
}
};
}