mantis-network/src/main/java/io/reactivex/mantis/network/push/ObservableTrigger.java [209:238]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                        (Throwable e) -> {
                            logger.warn("Observable used to push data errored, on server with name: " + name, e);
                            if (doOnError != null) {
                                doOnError.call(e);
                            }
                        },
                        () -> {
                            logger.info("Observable used to push data completed, on server with name: " + name);
                            if (doOnComplete != null) {
                                doOnComplete.call();
                            }
                        }
                    )
            );
            if (oldSub != null) {
                logger.info("A new subscription is ACTIVE. " +
                    "Unsubscribe from previous subscription observable trigger with name: " + name);
                oldSub.unsubscribe();
            }
        };

        Action1<MonitoredQueue<KeyValuePair<K, V>>> doOnStop = t1 -> {
            if (subRef.get() != null) {
                logger.warn("Connections from next stage has dropped to 0. " +
                    "Do not propagate unsubscribe until a new connection is made.");
                //subRef.get().unsubscribe();
            }
        };

        return new PushTrigger<>(doOnStart, doOnStop, metrics);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



mantis-network/src/main/java/io/reactivex/mantis/network/push/ObservableTrigger.java [274:303]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                        (Throwable e) -> {
                            logger.warn("Observable used to push data errored, on server with name: " + name, e);
                            if (doOnError != null) {
                                doOnError.call(e);
                            }
                        },
                        () -> {
                            logger.info("Observable used to push data completed, on server with name: " + name);
                            if (doOnComplete != null) {
                                doOnComplete.call();
                            }
                        }
                    )
            );
            if (oldSub != null) {
                logger.info("A new subscription is ACTIVE. " +
                    "Unsubscribe from previous subscription observable trigger with name: " + name);
                oldSub.unsubscribe();
            }
        };

        Action1<MonitoredQueue<KeyValuePair<K, V>>> doOnStop = t1 -> {
            if (subRef.get() != null) {
                logger.warn("Connections from next stage has dropped to 0. " +
                    "Do not propagate unsubscribe until a new connection is made.");
                //	subRef.get().unsubscribe();
            }
        };

        return new PushTrigger<>(doOnStart, doOnStop, metrics);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



