private Subscription serveGroupedObservable()

in mantis-remote-observable/src/main/java/io/reactivex/mantis/remote/observable/RemoteObservableConnectionHandler.java [213:314]


    private <K, V> Subscription serveGroupedObservable(
            final Observable<Group<K, V>> groups,
            final ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> connection,
            final RemoteRxEvent event,
            final Func1<Map<String, String>, Func1<K, Boolean>> filterFunction,
            final Encoder<K> keyEncoder,
            final Encoder<V> valueEncoder,
            final ServeGroupedObservable<K, V> serveConfig,
            final WritableEndpoint<GroupedObservable<K, V>> endpoint) {

        final MutableReference<Subscription> subReference = new MutableReference<>();
        subReference.setValue(groups
                // filter out groups based on subscription parameters
                .filter(new Func1<Group<K, V>, Boolean>() {
                    @Override
                    public Boolean call(Group<K, V> group) {
                        return filterFunction.call(event.getSubscribeParameters())
                                .call(group.getKeyValue());
                    }
                })
                .doOnCompleted(new Action0() {
                    @Override
                    public void call() {
                        logger.info("OnCompleted recieved in serveGroupedObservable, sending to client.");
                    }
                })
                .doOnError(new Action1<Throwable>() {
                    @Override
                    public void call(Throwable t1) {
                        logger.info("OnError received in serveGroupedObservable, sending to client: ", t1);
                    }
                })
                .materialize()
                .lift(new DisableBackPressureOperator<Notification<Group<K, V>>>())
                .buffer(writeBufferTimeMSec, TimeUnit.MILLISECONDS)
                .filter(new Func1<List<Notification<Group<K, V>>>, Boolean>() {
                    @Override
                    public Boolean call(List<Notification<Group<K, V>>> t1) {
                        return t1 != null && !t1.isEmpty();
                    }
                })
                .map(new Func1<List<Notification<Group<K, V>>>, List<RemoteRxEvent>>() {
                    @Override
                    public List<RemoteRxEvent> call(final List<Notification<Group<K, V>>> groupNotifications) {
                        List<RemoteRxEvent> rxEvents = new ArrayList<RemoteRxEvent>(groupNotifications.size());
                        for (Notification<Group<K, V>> groupNotification : groupNotifications) {
                            if (Kind.OnNext == groupNotification.getKind()) {
                                // encode inner group notification
                                Group<K, V> group = groupNotification.getValue();
                                final int keyLength = group.getKeyBytes().length;
                                Notification<V> notification = groupNotification.getValue().getNotification();
                                byte[] data = null;
                                if (Kind.OnNext == notification.getKind()) {
                                    V value = notification.getValue();
                                    byte[] valueBytes = valueEncoder.encode(value);
                                    // 1 byte for notification type,
                                    // 4 bytes is to encode key length as int
                                    data = ByteBuffer.allocate(1 + 4 + keyLength + valueBytes.length)
                                            .put((byte) 1)
                                            .putInt(keyLength)
                                            .put(group.getKeyBytes())
                                            .put(valueBytes)
                                            .array();
                                } else if (Kind.OnCompleted == notification.getKind()) {
                                    data = ByteBuffer.allocate(1 + 4 + keyLength)
                                            .put((byte) 2)
                                            .putInt(keyLength)
                                            .put(group.getKeyBytes())
                                            .array();
                                } else if (Kind.OnError == notification.getKind()) {
                                    Throwable error = notification.getThrowable();
                                    byte[] errorBytes = RemoteObservable.fromThrowableToBytes(error);
                                    data = ByteBuffer.allocate(1 + 4 + keyLength + errorBytes.length)
                                            .put((byte) 3)
                                            .putInt(keyLength)
                                            .put(group.getKeyBytes())
                                            .put(errorBytes)
                                            .array();
                                }
                                rxEvents.add(RemoteRxEvent.next(event.getName(), data));
                            } else if (Kind.OnCompleted == groupNotification.getKind()) {
                                rxEvents.add(RemoteRxEvent.completed(event.getName()));
                            } else if (Kind.OnError == groupNotification.getKind()) {
                                rxEvents.add(RemoteRxEvent.error(event.getName(),
                                        RemoteObservable.fromThrowableToBytes(groupNotification.getThrowable())));
                            } else {
                                throw new RuntimeException("Unsupported notification type: " + groupNotification.getKind());
                            }
                        }
                        return rxEvents;
                    }
                })
                .filter(new Func1<List<RemoteRxEvent>, Boolean>() {
                    @Override
                    public Boolean call(List<RemoteRxEvent> t1) {
                        return t1 != null && !t1.isEmpty();
                    }
                })
                .subscribe(new WriteBytesObserver(connection, subReference, serverMetrics,
                        serveConfig.getSlottingStrategy(), endpoint)));
        return subReference.getValue();
    }