private Subscription serveNestedObservable()

in mantis-remote-observable/src/main/java/io/reactivex/mantis/remote/observable/RemoteObservableConnectionHandler.java [148:211]


    private <T> Subscription serveNestedObservable(
            final Observable<T> observable,
            final ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> connection,
            final RemoteRxEvent event,
            final Func1<Map<String, String>, Func1<T, Boolean>> filterFunction,
            final Encoder<T> encoder,
            final ServeNestedObservable<Observable<T>> serveConfig,
            final WritableEndpoint<Observable<T>> endpoint) {

        final MutableReference<Subscription> subReference = new MutableReference<>();
        subReference.setValue(
                observable
                        .filter(filterFunction.call(event.getSubscribeParameters()))
                        .doOnCompleted(new Action0() {
                            @Override
                            public void call() {
                                logger.info("OnCompleted recieved in serveNestedObservable, sending to client.");
                            }
                        })
                        .doOnError(new Action1<Throwable>() {
                            @Override
                            public void call(Throwable t1) {
                                logger.info("OnError received in serveNestedObservable, sending to client: ", t1);
                            }
                        })
                        .map(new Func1<T, byte[]>() {
                            @Override
                            public byte[] call(T t1) {
                                return encoder.encode(t1);
                            }
                        })
                        .materialize()
                        .map(new Func1<Notification<byte[]>, RemoteRxEvent>() {
                            @Override
                            public RemoteRxEvent call(Notification<byte[]> notification) {
                                if (notification.getKind() == Notification.Kind.OnNext) {
                                    return RemoteRxEvent.next(event.getName(), notification.getValue());
                                } else if (notification.getKind() == Notification.Kind.OnError) {
                                    return RemoteRxEvent.error(event.getName(), RemoteObservable.fromThrowableToBytes(notification.getThrowable()));
                                } else if (notification.getKind() == Notification.Kind.OnCompleted) {
                                    return RemoteRxEvent.completed(event.getName());
                                } else {
                                    throw new RuntimeException("Unsupported notification kind: " + notification.getKind());
                                }
                            }
                        })
                        .lift(new DisableBackPressureOperator<RemoteRxEvent>())
                        .buffer(writeBufferTimeMSec, TimeUnit.MILLISECONDS)
                        .filter(new Func1<List<RemoteRxEvent>, Boolean>() {
                            @Override
                            public Boolean call(List<RemoteRxEvent> t1) {
                                return t1 != null && !t1.isEmpty();
                            }
                        })
                        .filter(new Func1<List<RemoteRxEvent>, Boolean>() {
                            @Override
                            public Boolean call(List<RemoteRxEvent> t1) {
                                return t1 != null && !t1.isEmpty();
                            }
                        })
                        .subscribe(new WriteBytesObserver(connection, subReference, serverMetrics,
                                serveConfig.getSlottingStrategy(), endpoint)));
        return subReference.getValue();
    }