in mantis-remote-observable/src/main/java/io/reactivex/mantis/remote/observable/reconciliator/Reconciliator.java [80:146]
private Observable<EndpointChange> deltas() {
final Map<String, Endpoint> sideEffectState = new HashMap<String, Endpoint>();
final PublishSubject<Integer> stopReconciliator = PublishSubject.create();
return
Observable.merge(
reconciledChanges
.takeUntil(stopReconciliator)
.doOnCompleted(() -> {
logger.info("onComplete triggered for reconciledChanges");
})
.doOnError(e -> logger.error("caught exception for reconciledChanges {}", e.getMessage(), e))
,
injector
.deltas()
.doOnCompleted(new Action0() {
@Override
public void call() {
// injector has completed recieving updates, complete reconciliator
// observable
logger.info("Stopping reconciliator, injector completed.");
stopReconciliator.onNext(1);
stopReconciliation();
}
})
.doOnError(e -> logger.error("caught exception for injector deltas {}", e.getMessage(), e))
.doOnNext(new Action1<EndpointChange>() {
@Override
public void call(EndpointChange newEndpointChange) {
String id = Endpoint.uniqueHost(newEndpointChange.getEndpoint().getHost(),
newEndpointChange.getEndpoint().getPort(), newEndpointChange.getEndpoint().getSlotId());
if (sideEffectState.containsKey(id)) {
if (newEndpointChange.getType() == Type.complete) {
// remove from expecected set
expectedSetSize.decrement();
sideEffectState.remove(id);
currentExpectedSet.onNext(new HashSet<Endpoint>(sideEffectState.values()));
}
} else {
if (newEndpointChange.getType() == Type.add) {
expectedSetSize.increment();
sideEffectState.put(id, new Endpoint(newEndpointChange.getEndpoint().getHost(),
newEndpointChange.getEndpoint().getPort(), newEndpointChange.getEndpoint().getSlotId()));
currentExpectedSet.onNext(new HashSet<Endpoint>(sideEffectState.values()));
}
}
}
})
)
.doOnError(t -> logger.error("caught error processing reconciliator deltas {}", t.getMessage(), t))
.doOnSubscribe(
new Action0() {
@Override
public void call() {
logger.info("Subscribed to deltas for {}, clearing active connection set", name);
connectionSet.resetActiveConnections();
startReconciliation();
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
logger.info("Unsubscribed from deltas for {}", name);
}
});
}