private Subscription serveObservable()

in mantis-remote-observable/src/main/java/io/reactivex/mantis/remote/observable/RemoteObservableConnectionHandler.java [80:145]


    private <T> Subscription serveObservable(
            final Observable<T> observable,
            final ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> connection,
            final RemoteRxEvent event,
            final Func1<Map<String, String>, Func1<T, Boolean>> filterFunction,
            final Encoder<T> encoder,
            final ServeObservable<T> serveConfig,
            final WritableEndpoint<Observable<T>> endpoint) {

        final MutableReference<Subscription> subReference = new MutableReference<>();
        subReference.setValue(
                observable
                        .filter(filterFunction.call(event.getSubscribeParameters()))
                        .doOnCompleted(new Action0() {
                            @Override
                            public void call() {
                                logger.info("OnCompleted recieved in serveObservable, sending to client.");
                            }
                        })
                        .doOnError(new Action1<Throwable>() {
                            @Override
                            public void call(Throwable t1) {
                                logger.info("OnError received in serveObservable, sending to client: ", t1);
                            }
                        })
                        .materialize()
                        .lift(new DisableBackPressureOperator<Notification<T>>())
                        .buffer(writeBufferTimeMSec, TimeUnit.MILLISECONDS)
                        .filter(new Func1<List<Notification<T>>, Boolean>() {
                            @Override
                            public Boolean call(List<Notification<T>> t1) {
                                return t1 != null && !t1.isEmpty();
                            }
                        })
                        .map(new Func1<List<Notification<T>>, List<RemoteRxEvent>>() {

                            @Override
                            public List<RemoteRxEvent> call(List<Notification<T>> notifications) {

                                List<RemoteRxEvent> rxEvents = new ArrayList<RemoteRxEvent>(notifications.size());
                                for (Notification<T> notification : notifications) {


                                    if (notification.getKind() == Notification.Kind.OnNext) {
                                        rxEvents.add(RemoteRxEvent.next(event.getName(), encoder.encode(notification.getValue())));
                                    } else if (notification.getKind() == Notification.Kind.OnError) {
                                        rxEvents.add(RemoteRxEvent.error(event.getName(), RemoteObservable.fromThrowableToBytes(notification.getThrowable())));
                                    } else if (notification.getKind() == Notification.Kind.OnCompleted) {
                                        rxEvents.add(RemoteRxEvent.completed(event.getName()));
                                    } else {
                                        throw new RuntimeException("Unsupported notification kind: " + notification.getKind());
                                    }
                                }
                                return rxEvents;
                            }
                        })
                        .filter(new Func1<List<RemoteRxEvent>, Boolean>() {
                            @Override
                            public Boolean call(List<RemoteRxEvent> t1) {
                                return t1 != null && !t1.isEmpty();
                            }
                        })
                        .subscribe(new WriteBytesObserver(connection, subReference, serverMetrics,
                                serveConfig.getSlottingStrategy(), endpoint)));
        return subReference.getValue();
    }