public PushServer()

in mantis-network/src/main/java/io/reactivex/mantis/network/push/PushServer.java [81:164]


    public PushServer(final PushTrigger<T> trigger, ServerConfig<T> config,
                      Observable<String> serverSignals) {

        this.serverSignals = serverSignals;
        serverName = config.getName();
        maxNotWritableTimeSec = config.getMaxNotWritableTimeSec();
        metricsRegistry = config.getMetricsRegistry();

        outboundBuffer = new MonitoredQueue<T>(serverName, config.getBufferCapacity(), config.useSpscQueue());
        trigger.setBuffer(outboundBuffer);

        Action0 doOnFirstConnection = new Action0() {
            @Override
            public void call() {
                trigger.start();
            }
        };
        Action0 doOnZeroConnections = new Action0() {
            @Override
            public void call() {
                logger.info("doOnZeroConnections Triggered");
                trigger.stop();
            }
        };

        final String serverNameValue = Optional.ofNullable(serverName).orElse("none");
        final BasicTag idTag = new BasicTag(GROUP_ID_TAG, serverNameValue);
        final MetricGroupId metricsGroup = new MetricGroupId("PushServer", idTag);
        // manager will auto add metrics for connection groups
        connectionManager = new ConnectionManager<T>(metricsRegistry, doOnFirstConnection,
            doOnZeroConnections);


        int numQueueProcessingThreads = config.getNumQueueConsumers();
        MonitoredThreadPool consumerThreads = new MonitoredThreadPool("QueueConsumerPool",
            new ThreadPoolExecutor(numQueueProcessingThreads, numQueueProcessingThreads, 5, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(numQueueProcessingThreads), new NamedThreadFactory("QueueConsumerPool")));

        logger.info("PushServer create consumer threads, use spsc: {}, num threads: {}, buffer capacity: {}, " +
                "chunk size: {}, chunk time ms: {}", config.useSpscQueue(), numQueueProcessingThreads,
            config.getBufferCapacity(), config.getMaxChunkSize(), config.getMaxChunkTimeMSec());
        if (config.useSpscQueue()) {
            consumerThreadFutures.add(consumerThreads.submit(new SingleThreadedChunker<T>(
                config.getChunkProcessor(),
                outboundBuffer,
                config.getMaxChunkSize(),
                config.getMaxChunkTimeMSec(),
                connectionManager
            )));
        } else {

            for (int i = 0; i < numQueueProcessingThreads; i++) {
                consumerThreadFutures.add(consumerThreads.submit(new TimedChunker<T>(
                    outboundBuffer,
                    config.getMaxChunkSize(),
                    config.getMaxChunkTimeMSec(),
                    config.getChunkProcessor(),
                    connectionManager
                )));
            }
        }

        Metrics serverMetrics = new Metrics.Builder()
            .id(metricsGroup)
            .addCounter("numProcessedWrites")
            .addCounter("numSuccessfulWrites")
            .addCounter("numFailedWrites")
            .addGauge(connectionManager.getActiveConnections(metricsGroup))
            .addGauge("batchWriteSize")
            .build();
        successfulWrites = serverMetrics.getCounter("numSuccessfulWrites");
        failedWrites = serverMetrics.getCounter("numFailedWrites");
        batchWriteSize = serverMetrics.getGauge("batchWriteSize");
        processedWrites = serverMetrics.getCounter("numProcessedWrites");

        registerMetrics(metricsRegistry, serverMetrics, consumerThreads.getMetrics(),
            outboundBuffer.getMetrics(), trigger.getMetrics(),
            config.getChunkProcessor().router.getMetrics());

        port = config.getPort();
        writeRetryCount = config.getWriteRetryCount();
        scheduledExecutorService = new ScheduledThreadPoolExecutor(10,
            new ThreadFactoryBuilder().setNameFormat("netty-channel-checker-%d").build());
    }