mantis-remote-observable/src/main/java/io/reactivex/mantis/remote/observable/RemoteObservable.java [229:281]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        final Decoder<K> keyDecoder = params.getKeyDecoder();
        final Decoder<V> valueDecoder = params.getValueDecoder();
        loadFastProperties();
        return
                RxNetty.createTcpClient(params.getHost(), params.getPort(), new PipelineConfiguratorComposite<RemoteRxEvent, List<RemoteRxEvent>>(
                        new PipelineConfigurator<RemoteRxEvent, List<RemoteRxEvent>>() {
                            @Override
                            public void configureNewPipeline(ChannelPipeline pipeline) {
                                if (enableNettyLogging) {
                                    pipeline.addFirst(new LoggingHandler(LogLevel.ERROR)); // uncomment to enable debug logging
                                }
                                if (enableHeartBeating) {
                                    pipeline.addLast("idleStateHandler", new IdleStateHandler(10, 2, 0));
                                    pipeline.addLast("heartbeat", new HeartbeatHandler());
                                }
                                if (enableCompression) {
                                    pipeline.addLast("gzipInflater", new JdkZlibEncoder(ZlibWrapper.GZIP));
                                    pipeline.addLast("gzipDeflater", new JdkZlibDecoder(ZlibWrapper.GZIP));
                                }
                                pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); // 4 bytes to encode length
                                pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(maxFrameLength, 0, 4, 0, 4)); // max frame = half MB
                            }
                        }, new BatchedRxEventPipelineConfigurator()))
                        .connect()
                        // send subscription request, get input stream
                        .flatMap(new Func1<ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>>, Observable<RemoteRxEvent>>() {
                            @Override
                            public Observable<RemoteRxEvent> call(final ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> connection) {
                                connection.writeAndFlush(RemoteRxEvent.subscribed(params.getName(), params.getSubscribeParameters())); // send subscribe event to server
                                remoteUnsubscribe.setConnection(connection);
                                return connection.getInput()
                                        .lift(new DropOperator<RemoteRxEvent>("incoming_" + RemoteObservable.class.getCanonicalName() + "_createTcpConnectionToServerGroups"))
                                        .rebatchRequests(bufferSize <= 0 ? 1 : bufferSize);
                            }
                        })
                        .doOnCompleted(new Action0() {
                            @Override
                            public void call() {
                                // connection completed
                                logger.warn("Detected connection completed when trying to connect to host: " + params.getHost() + " port: " + params.getPort());
                                connectionDisconnectCallback.call();
                            }
                        })
                        .onErrorResumeNext(new Func1<Throwable, Observable<RemoteRxEvent>>() {
                            @Override
                            public Observable<RemoteRxEvent> call(Throwable t1) {
                                logger.warn("Detected connection error when trying to connect to host: " + params.getHost() + " port: " + params.getPort(), t1);
                                connectionDisconnectCallback.call();
                                // complete if error occurs
                                return Observable.empty();
                            }
                        })
                        .takeUntil(closeTrigger)
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



mantis-remote-observable/src/main/java/io/reactivex/mantis/remote/observable/RemoteObservable.java [375:427]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        final Decoder<K> keyDecoder = params.getKeyDecoder();
        final Decoder<V> valueDecoder = params.getValueDecoder();
        loadFastProperties();
        return
                RxNetty.createTcpClient(params.getHost(), params.getPort(), new PipelineConfiguratorComposite<RemoteRxEvent, List<RemoteRxEvent>>(
                        new PipelineConfigurator<RemoteRxEvent, List<RemoteRxEvent>>() {
                            @Override
                            public void configureNewPipeline(ChannelPipeline pipeline) {
                                if (enableNettyLogging) {
                                    pipeline.addFirst(new LoggingHandler(LogLevel.ERROR)); // uncomment to enable debug logging
                                }
                                if (enableHeartBeating) {
                                    pipeline.addLast("idleStateHandler", new IdleStateHandler(10, 2, 0));
                                    pipeline.addLast("heartbeat", new HeartbeatHandler());
                                }
                                if (enableCompression) {
                                    pipeline.addLast("gzipInflater", new JdkZlibEncoder(ZlibWrapper.GZIP));
                                    pipeline.addLast("gzipDeflater", new JdkZlibDecoder(ZlibWrapper.GZIP));
                                }
                                pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); // 4 bytes to encode length
                                pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(maxFrameLength, 0, 4, 0, 4)); // max frame = half MB
                            }
                        }, new BatchedRxEventPipelineConfigurator()))
                        .connect()
                        // send subscription request, get input stream
                        .flatMap(new Func1<ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>>, Observable<RemoteRxEvent>>() {
                            @Override
                            public Observable<RemoteRxEvent> call(final ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> connection) {
                                connection.writeAndFlush(RemoteRxEvent.subscribed(params.getName(), params.getSubscribeParameters())); // send subscribe event to server
                                remoteUnsubscribe.setConnection(connection);
                                return connection.getInput()
                                        .lift(new DropOperator<RemoteRxEvent>("incoming_" + RemoteObservable.class.getCanonicalName() + "_createTcpConnectionToServerGroups"))
                                        .rebatchRequests(bufferSize <= 0 ? 1 : bufferSize);
                            }
                        })
                        .doOnCompleted(new Action0() {
                            @Override
                            public void call() {
                                // connection completed
                                logger.warn("Detected connection completed when trying to connect to host: " + params.getHost() + " port: " + params.getPort());
                                connectionDisconnectCallback.call();
                            }
                        })
                        .onErrorResumeNext(new Func1<Throwable, Observable<RemoteRxEvent>>() {
                            @Override
                            public Observable<RemoteRxEvent> call(Throwable t1) {
                                logger.warn("Detected connection error when trying to connect to host: " + params.getHost() + " port: " + params.getPort(), t1);
                                connectionDisconnectCallback.call();
                                // complete if error occurs
                                return Observable.empty();
                            }
                        })
                        .takeUntil(closeTrigger)
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



