private void applySlottingSideEffectToObservable()

in mantis-remote-observable/src/main/java/io/reactivex/mantis/remote/observable/ServeGroupedObservable.java [82:199]


    private void applySlottingSideEffectToObservable(
            Observable<Observable<GroupedObservable<String, V>>> o,
            final Observable<Integer> minConnectionsToSubscribe) {

        final AtomicInteger currentMinConnectionsToSubscribe = new AtomicInteger();
        minConnectionsToSubscribe
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer t1) {
                        currentMinConnectionsToSubscribe.set(t1);
                    }
                });

        Observable<Observable<List<Group<String, V>>>> listOfGroups = o
                .map(new Func1<Observable<GroupedObservable<String, V>>, Observable<List<Group<String, V>>>>() {
                    @Override
                    public Observable<List<Group<String, V>>> call(
                            Observable<GroupedObservable<String, V>> og) {
                        return
                                og
                                        .flatMap(new Func1<GroupedObservable<String, V>, Observable<List<Group<String, V>>>>() {
                                            @Override
                                            public Observable<List<Group<String, V>>> call(
                                                    final GroupedObservable<String, V> group) {
                                                final byte[] keyBytes = keyEncoder.encode(group.getKey());
                                                final String keyValue = group.getKey();
                                                return
                                                        group
                                                                // comment out as this causes a NPE to happen in merge. supposedly fixed in rxjava 1.0
                                                                .doOnUnsubscribe(new Action0() {
                                                                    @Override
                                                                    public void call() {
                                                                        //logger.info("Expiring group stage in serveGroupedObservable " + group.getKey());
                                                                        groupsExpiredCounter.increment();
                                                                    }
                                                                })
                                                                .timeout(expiryInSecs, TimeUnit.SECONDS, (Observable<? extends V>) Observable.empty())
                                                                .materialize()
                                                                .lift(new DisableBackPressureOperator<Notification<V>>())
                                                                .buffer(groupBufferTimeMSec, TimeUnit.MILLISECONDS)
                                                                .filter(new Func1<List<Notification<V>>, Boolean>() {
                                                                    @Override
                                                                    public Boolean call(List<Notification<V>> t1) {
                                                                        return t1 != null && !t1.isEmpty();
                                                                    }
                                                                })
                                                                .map(new Func1<List<Notification<V>>, List<Group<String, V>>>() {
                                                                    @Override
                                                                    public List<Group<String, V>> call(List<Notification<V>> notifications) {
                                                                        List<Group<String, V>> groups = new ArrayList<>(notifications.size());
                                                                        for (Notification<V> notification : notifications) {
                                                                            groups.add(new Group<String, V>(keyValue, keyBytes, notification));
                                                                        }
                                                                        return groups;
                                                                    }
                                                                });
                                            }
                                        });
                    }
                });
        final Observable<List<Group<String, V>>> withSideEffects =
                Observable
                        .merge(
                                listOfGroups

                        )
                        .doOnEach(new Observer<List<Group<String, V>>>() {
                            @Override
                            public void onCompleted() {
                                slottingStrategy.completeAllConnections();
                            }

                            @Override
                            public void onError(Throwable e) {
                                e.printStackTrace();
                                slottingStrategy.errorAllConnections(e);
                            }

                            @Override
                            public void onNext(List<Group<String, V>> listOfGroups) {
                                for (Group<String, V> group : listOfGroups) {
                                    slottingStrategy.writeOnSlot(group.getKeyBytes(), group);
                                }
                            }
                        });


        final MutableReference<Subscription> subscriptionRef = new MutableReference<>();
        final AtomicInteger connectionCount = new AtomicInteger(0);
        final AtomicBoolean isSubscribed = new AtomicBoolean();

        slottingStrategy.registerDoOnEachConnectionAdded(new Action0() {
            @Override
            public void call() {
                Integer minNeeded = currentMinConnectionsToSubscribe.get();
                Integer current = connectionCount.incrementAndGet();
                if (current >= minNeeded) {
                    if (isSubscribed.compareAndSet(false, true)) {
                        logger.info("MinConnectionsToSubscribe: " + minNeeded + ", has been met, subscribing to observable, current connection count: " + current);
                        subscriptionRef.setValue(withSideEffects.subscribe());
                    }
                } else {
                    logger.info("MinConnectionsToSubscribe: " + minNeeded + ", has NOT been met, current connection count: " + current);
                }
            }
        });

        slottingStrategy.registerDoAfterLastConnectionRemoved(new Action0() {
            @Override
            public void call() {
                subscriptionRef.getValue().unsubscribe();
                logger.info("All connections deregistered, unsubscribed to observable, resetting current connection count: 0");
                connectionCount.set(0);
                isSubscribed.set(false);
            }
        });

    }