private Observable deltas()

in mantis-remote-observable/src/main/java/io/reactivex/mantis/remote/observable/reconciliator/Reconciliator.java [80:146]


    private Observable<EndpointChange> deltas() {

        final Map<String, Endpoint> sideEffectState = new HashMap<String, Endpoint>();
        final PublishSubject<Integer> stopReconciliator = PublishSubject.create();

        return
                Observable.merge(
                        reconciledChanges
                                .takeUntil(stopReconciliator)
                                .doOnCompleted(() -> {
                                    logger.info("onComplete triggered for reconciledChanges");
                                })
                                .doOnError(e -> logger.error("caught exception for reconciledChanges {}", e.getMessage(), e))
                        ,
                        injector
                                .deltas()
                                .doOnCompleted(new Action0() {
                                    @Override
                                    public void call() {
                                        // injector has completed recieving updates, complete reconciliator
                                        // observable
                                        logger.info("Stopping reconciliator, injector completed.");
                                        stopReconciliator.onNext(1);
                                        stopReconciliation();
                                    }
                                })
                                .doOnError(e -> logger.error("caught exception for injector deltas {}", e.getMessage(), e))
                                .doOnNext(new Action1<EndpointChange>() {
                                    @Override
                                    public void call(EndpointChange newEndpointChange) {
                                        String id = Endpoint.uniqueHost(newEndpointChange.getEndpoint().getHost(),
                                                newEndpointChange.getEndpoint().getPort(), newEndpointChange.getEndpoint().getSlotId());
                                        if (sideEffectState.containsKey(id)) {
                                            if (newEndpointChange.getType() == Type.complete) {
                                                // remove from expecected set
                                                expectedSetSize.decrement();
                                                sideEffectState.remove(id);
                                                currentExpectedSet.onNext(new HashSet<Endpoint>(sideEffectState.values()));
                                            }
                                        } else {
                                            if (newEndpointChange.getType() == Type.add) {
                                                expectedSetSize.increment();
                                                sideEffectState.put(id, new Endpoint(newEndpointChange.getEndpoint().getHost(),
                                                        newEndpointChange.getEndpoint().getPort(), newEndpointChange.getEndpoint().getSlotId()));
                                                currentExpectedSet.onNext(new HashSet<Endpoint>(sideEffectState.values()));
                                            }
                                        }
                                    }
                                })
                )
                        .doOnError(t -> logger.error("caught error processing reconciliator deltas {}", t.getMessage(), t))
                        .doOnSubscribe(
                                new Action0() {
                                    @Override
                                    public void call() {
                                        logger.info("Subscribed to deltas for {}, clearing active connection set", name);
                                        connectionSet.resetActiveConnections();
                                        startReconciliation();
                                    }
                                })
                        .doOnUnsubscribe(new Action0() {
                            @Override
                            public void call() {
                                logger.info("Unsubscribed from deltas for {}", name);
                            }
                        });
    }