in mantis-network/src/main/java/io/reactivex/mantis/network/push/PushServer.java [81:164]
public PushServer(final PushTrigger<T> trigger, ServerConfig<T> config,
Observable<String> serverSignals) {
this.serverSignals = serverSignals;
serverName = config.getName();
maxNotWritableTimeSec = config.getMaxNotWritableTimeSec();
metricsRegistry = config.getMetricsRegistry();
outboundBuffer = new MonitoredQueue<T>(serverName, config.getBufferCapacity(), config.useSpscQueue());
trigger.setBuffer(outboundBuffer);
Action0 doOnFirstConnection = new Action0() {
@Override
public void call() {
trigger.start();
}
};
Action0 doOnZeroConnections = new Action0() {
@Override
public void call() {
logger.info("doOnZeroConnections Triggered");
trigger.stop();
}
};
final String serverNameValue = Optional.ofNullable(serverName).orElse("none");
final BasicTag idTag = new BasicTag(GROUP_ID_TAG, serverNameValue);
final MetricGroupId metricsGroup = new MetricGroupId("PushServer", idTag);
// manager will auto add metrics for connection groups
connectionManager = new ConnectionManager<T>(metricsRegistry, doOnFirstConnection,
doOnZeroConnections);
int numQueueProcessingThreads = config.getNumQueueConsumers();
MonitoredThreadPool consumerThreads = new MonitoredThreadPool("QueueConsumerPool",
new ThreadPoolExecutor(numQueueProcessingThreads, numQueueProcessingThreads, 5, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(numQueueProcessingThreads), new NamedThreadFactory("QueueConsumerPool")));
logger.info("PushServer create consumer threads, use spsc: {}, num threads: {}, buffer capacity: {}, " +
"chunk size: {}, chunk time ms: {}", config.useSpscQueue(), numQueueProcessingThreads,
config.getBufferCapacity(), config.getMaxChunkSize(), config.getMaxChunkTimeMSec());
if (config.useSpscQueue()) {
consumerThreadFutures.add(consumerThreads.submit(new SingleThreadedChunker<T>(
config.getChunkProcessor(),
outboundBuffer,
config.getMaxChunkSize(),
config.getMaxChunkTimeMSec(),
connectionManager
)));
} else {
for (int i = 0; i < numQueueProcessingThreads; i++) {
consumerThreadFutures.add(consumerThreads.submit(new TimedChunker<T>(
outboundBuffer,
config.getMaxChunkSize(),
config.getMaxChunkTimeMSec(),
config.getChunkProcessor(),
connectionManager
)));
}
}
Metrics serverMetrics = new Metrics.Builder()
.id(metricsGroup)
.addCounter("numProcessedWrites")
.addCounter("numSuccessfulWrites")
.addCounter("numFailedWrites")
.addGauge(connectionManager.getActiveConnections(metricsGroup))
.addGauge("batchWriteSize")
.build();
successfulWrites = serverMetrics.getCounter("numSuccessfulWrites");
failedWrites = serverMetrics.getCounter("numFailedWrites");
batchWriteSize = serverMetrics.getGauge("batchWriteSize");
processedWrites = serverMetrics.getCounter("numProcessedWrites");
registerMetrics(metricsRegistry, serverMetrics, consumerThreads.getMetrics(),
outboundBuffer.getMetrics(), trigger.getMetrics(),
config.getChunkProcessor().router.getMetrics());
port = config.getPort();
writeRetryCount = config.getWriteRetryCount();
scheduledExecutorService = new ScheduledThreadPoolExecutor(10,
new ThreadFactoryBuilder().setNameFormat("netty-channel-checker-%d").build());
}