public RxServer createServer()

in mantis-network/src/main/java/io/reactivex/mantis/network/push/LegacyTcpPushServer.java [62:177]


    public RxServer<?, ?> createServer() {
        RxServer<RemoteRxEvent, RemoteRxEvent> server
                = RxNetty.newTcpServerBuilder(port, new ConnectionHandler<RemoteRxEvent, RemoteRxEvent>() {
            @Override
            public Observable<Void> handle(
                    final ObservableConnection<RemoteRxEvent, RemoteRxEvent> newConnection) {

                final InetSocketAddress socketAddress = (InetSocketAddress) newConnection.getChannel().remoteAddress();

                // extract groupId, id, predicate from incoming byte[]
                return
                        newConnection.getInput()
                                .flatMap(new Func1<RemoteRxEvent, Observable<Void>>() {

                                    @Override
                                    public Observable<Void> call(
                                            RemoteRxEvent incomingRequest) {

                                        if (incomingRequest.getType() == RemoteRxEvent.Type.subscribed) {
                                            Map<String, String> params = incomingRequest.getSubscribeParameters();

                                            // client state
                                            String id = null;
                                            String slotId = null;
                                            String groupId = null;
                                            // sample state
                                            boolean enableSampling = false;
                                            long samplingTimeMsec = 0;
                                            String availabilityZone = null;

                                            // predicate state
                                            Map<String, List<String>> predicateParams = null;

                                            if (params != null && !params.isEmpty()) {
                                                predicateParams = new HashMap<String, List<String>>();
                                                for (Entry<String, String> entry : params.entrySet()) {
                                                    List<String> values = new LinkedList<>();
                                                    values.add(entry.getValue());
                                                    predicateParams.put(entry.getKey(), values);
                                                }

                                                if (params.containsKey("id")) {
                                                    id = params.get("id");
                                                }
                                                if (params.containsKey("slotId")) {
                                                    slotId = params.get("slotId");
                                                }
                                                if (params.containsKey("groupId")) {
                                                    groupId = params.get("groupId");
                                                }
                                                if (params.containsKey("sample")) {
                                                    samplingTimeMsec = Long.parseLong(params.get("sample")) * 1000;
                                                    if (samplingTimeMsec < 50) {
                                                        throw new IllegalArgumentException("Sampling rate too low: " + samplingTimeMsec);
                                                    }
                                                    enableSampling = true;
                                                }
                                                if (params.containsKey("sampleMSec")) {
                                                    samplingTimeMsec = Long.parseLong(params.get("sampleMSec"));
                                                    if (samplingTimeMsec < 50) {
                                                        throw new IllegalArgumentException("Sampling rate too low: " + samplingTimeMsec);
                                                    }
                                                    enableSampling = true;
                                                }
                                                if (params.containsKey("availabilityZone") && !params.get("availabilityZone").isEmpty()) {
                                                    availabilityZone = params.get("availabilityZone");
                                                }
                                            }
                                            Func1<T, Boolean> predicateFunction = null;
                                            if (predicate != null) {
                                                predicateFunction = predicate.call(predicateParams);
                                            }

                                            // support legacy metrics per connection
                                            Metrics sseSinkMetrics = new Metrics.Builder()
                                                    .name("DropOperator_outgoing_subject_" + slotId)
                                                    .addCounter("onNext")
                                                    .addCounter("dropped")
                                                    .build();

                                            sseSinkMetrics = metricsRegistry.registerAndGet(sseSinkMetrics);

                                            Counter legacyMsgProcessedCounter = sseSinkMetrics.getCounter("onNext");
                                            Counter legacyDroppedWrites = sseSinkMetrics.getCounter("dropped");

                                            return manageConnection(newConnection, socketAddress.getHostString(), socketAddress.getPort(),
                                                    groupId, slotId, id, null,
                                                    false, null, enableSampling, samplingTimeMsec,
                                                    predicateFunction, null, legacyMsgProcessedCounter, legacyDroppedWrites,
                                                    null, availabilityZone);
                                        }
                                        return null;
                                    }
                                });
            }
        })
                .pipelineConfigurator(new PipelineConfiguratorComposite<RemoteRxEvent, RemoteRxEvent>(
                        new PipelineConfigurator<RemoteRxEvent, RemoteRxEvent>() {
                            @Override
                            public void configureNewPipeline(ChannelPipeline pipeline) {

                                //					pipeline.addLast(new LoggingHandler(LogLevel.ERROR)); // uncomment to enable debug logging
                                pipeline.addLast("idleStateHandler", new IdleStateHandler(10, 2, 0));
                                pipeline.addLast("heartbeat", new HeartbeatHandler());
                                pipeline.addLast("gzipInflater", new JdkZlibEncoder(ZlibWrapper.GZIP));
                                pipeline.addLast("gzipDeflater", new JdkZlibDecoder(ZlibWrapper.GZIP));
                                pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); // 4 bytes to encode length
                                pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(5242880, 0, 4, 0, 4)); // max frame = half MB

                            }
                        }, new LegacyTcpPipelineConfigurator(name)))
                .channelOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 1024, 5 * 1024 * 1024))

                .build();
        return server;
    }