in mantis-remote-observable/src/main/java/io/reactivex/mantis/remote/observable/ServeGroupedObservable.java [82:199]
private void applySlottingSideEffectToObservable(
Observable<Observable<GroupedObservable<String, V>>> o,
final Observable<Integer> minConnectionsToSubscribe) {
final AtomicInteger currentMinConnectionsToSubscribe = new AtomicInteger();
minConnectionsToSubscribe
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer t1) {
currentMinConnectionsToSubscribe.set(t1);
}
});
Observable<Observable<List<Group<String, V>>>> listOfGroups = o
.map(new Func1<Observable<GroupedObservable<String, V>>, Observable<List<Group<String, V>>>>() {
@Override
public Observable<List<Group<String, V>>> call(
Observable<GroupedObservable<String, V>> og) {
return
og
.flatMap(new Func1<GroupedObservable<String, V>, Observable<List<Group<String, V>>>>() {
@Override
public Observable<List<Group<String, V>>> call(
final GroupedObservable<String, V> group) {
final byte[] keyBytes = keyEncoder.encode(group.getKey());
final String keyValue = group.getKey();
return
group
// comment out as this causes a NPE to happen in merge. supposedly fixed in rxjava 1.0
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
//logger.info("Expiring group stage in serveGroupedObservable " + group.getKey());
groupsExpiredCounter.increment();
}
})
.timeout(expiryInSecs, TimeUnit.SECONDS, (Observable<? extends V>) Observable.empty())
.materialize()
.lift(new DisableBackPressureOperator<Notification<V>>())
.buffer(groupBufferTimeMSec, TimeUnit.MILLISECONDS)
.filter(new Func1<List<Notification<V>>, Boolean>() {
@Override
public Boolean call(List<Notification<V>> t1) {
return t1 != null && !t1.isEmpty();
}
})
.map(new Func1<List<Notification<V>>, List<Group<String, V>>>() {
@Override
public List<Group<String, V>> call(List<Notification<V>> notifications) {
List<Group<String, V>> groups = new ArrayList<>(notifications.size());
for (Notification<V> notification : notifications) {
groups.add(new Group<String, V>(keyValue, keyBytes, notification));
}
return groups;
}
});
}
});
}
});
final Observable<List<Group<String, V>>> withSideEffects =
Observable
.merge(
listOfGroups
)
.doOnEach(new Observer<List<Group<String, V>>>() {
@Override
public void onCompleted() {
slottingStrategy.completeAllConnections();
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
slottingStrategy.errorAllConnections(e);
}
@Override
public void onNext(List<Group<String, V>> listOfGroups) {
for (Group<String, V> group : listOfGroups) {
slottingStrategy.writeOnSlot(group.getKeyBytes(), group);
}
}
});
final MutableReference<Subscription> subscriptionRef = new MutableReference<>();
final AtomicInteger connectionCount = new AtomicInteger(0);
final AtomicBoolean isSubscribed = new AtomicBoolean();
slottingStrategy.registerDoOnEachConnectionAdded(new Action0() {
@Override
public void call() {
Integer minNeeded = currentMinConnectionsToSubscribe.get();
Integer current = connectionCount.incrementAndGet();
if (current >= minNeeded) {
if (isSubscribed.compareAndSet(false, true)) {
logger.info("MinConnectionsToSubscribe: " + minNeeded + ", has been met, subscribing to observable, current connection count: " + current);
subscriptionRef.setValue(withSideEffects.subscribe());
}
} else {
logger.info("MinConnectionsToSubscribe: " + minNeeded + ", has NOT been met, current connection count: " + current);
}
}
});
slottingStrategy.registerDoAfterLastConnectionRemoved(new Action0() {
@Override
public void call() {
subscriptionRef.getValue().unsubscribe();
logger.info("All connections deregistered, unsubscribed to observable, resetting current connection count: 0");
connectionCount.set(0);
isSubscribed.set(false);
}
});
}