in modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java [791:971]
public GridNioServer<Message> resetNioServer() throws IgniteCheckedException {
if (cfg.boundTcpPort() >= 0)
throw new IgniteCheckedException("Tcp NIO server was already created on port " + cfg.boundTcpPort());
IgniteCheckedException lastEx = null;
// If configured TCP port is busy, find first available in range.
int lastPort = cfg.localPort() == -1 ? -1
: cfg.localPortRange() == 0 ? cfg.localPort() : cfg.localPort() + cfg.localPortRange() - 1;
for (int port = cfg.localPort(); port <= lastPort; port++) {
try {
MessageFactory msgFactory = new MessageFactory() {
private MessageFactory impl;
@Override public void register(short directType, Supplier<Message> supplier) throws IgniteException {
get().register(directType, supplier);
}
@Nullable @Override public Message create(short type) {
return get().create(type);
}
private MessageFactory get() {
if (impl == null) {
impl = stateProvider.getSpiContext().messageFactory();
assert impl != null;
}
return impl;
}
};
GridNioMessageReaderFactory readerFactory = new GridNioMessageReaderFactory() {
private IgniteSpiContext context;
private MessageFormatter formatter;
@Override public MessageReader reader(GridNioSession ses, MessageFactory msgFactory)
throws IgniteCheckedException {
final IgniteSpiContext ctx = stateProvider.getSpiContextWithoutInitialLatch();
if (formatter == null || context != ctx) {
context = ctx;
formatter = context.messageFormatter();
}
assert formatter != null;
ConnectionKey key = ses.meta(CONN_IDX_META);
return key != null ? formatter.reader(key.nodeId(), msgFactory) : null;
}
};
GridNioMessageWriterFactory writerFactory = new GridNioMessageWriterFactory() {
private IgniteSpiContext context;
private MessageFormatter formatter;
@Override public MessageWriter writer(GridNioSession ses) throws IgniteCheckedException {
final IgniteSpiContext ctx = stateProvider.getSpiContextWithoutInitialLatch();
if (formatter == null || context != ctx) {
context = ctx;
formatter = context.messageFormatter();
}
assert formatter != null;
ConnectionKey key = ses.meta(CONN_IDX_META);
return key != null ? formatter.writer(key.nodeId()) : null;
}
};
GridDirectParser parser = new GridDirectParser(log.getLogger(GridDirectParser.class),
msgFactory,
readerFactory);
IgnitePredicate<Message> skipRecoveryPred = msg -> msg instanceof RecoveryLastReceivedMessage;
boolean clientMode = Boolean.TRUE.equals(igniteCfg.isClientMode());
IgniteBiInClosure<GridNioSession, Integer> queueSizeMonitor =
!clientMode && cfg.slowClientQueueLimit() > 0 ? this::checkClientQueueSize : null;
List<GridNioFilter> filters = new ArrayList<>();
if (tracing instanceof GridTracingManager && ((GridManager)tracing).enabled())
filters.add(new GridNioTracerFilter(log, tracing));
filters.add(new GridNioCodecFilter(parser, log, true));
filters.add(new GridConnectionBytesVerifyFilter(log));
if (stateProvider.isSslEnabled()) {
GridNioSslFilter sslFilter = new GridNioSslFilter(
igniteCfg.getSslContextFactory().create(),
true,
ByteOrder.LITTLE_ENDIAN,
log,
metricMgr == null ? null : metricMgr.registry(COMMUNICATION_METRICS_GROUP_NAME));
sslFilter.directMode(true);
sslFilter.wantClientAuth(true);
sslFilter.needClientAuth(true);
filters.add(sslFilter);
}
GridNioServer.Builder<Message> builder = GridNioServer.<Message>builder()
.address(cfg.localHost())
.port(port)
.listener(srvLsnr)
.logger(log)
.selectorCount(cfg.selectorsCount())
.igniteInstanceName(igniteInstanceName)
.serverName("tcp-comm")
.tcpNoDelay(cfg.tcpNoDelay())
.directBuffer(cfg.directBuffer())
.byteOrder(ByteOrder.LITTLE_ENDIAN)
.socketSendBufferSize(cfg.socketSendBuffer())
.socketReceiveBufferSize(cfg.socketReceiveBuffer())
.sendQueueLimit(cfg.messageQueueLimit())
.directMode(true)
.writeTimeout(cfg.socketWriteTimeout())
.selectorSpins(cfg.selectorSpins())
.filters(filters.toArray(new GridNioFilter[filters.size()]))
.writerFactory(writerFactory)
.skipRecoveryPredicate(skipRecoveryPred)
.messageQueueSizeListener(queueSizeMonitor)
.tracing(tracing)
.readWriteSelectorsAssign(cfg.usePairedConnections());
if (metricMgr != null) {
builder.workerListener(workersRegistry)
.metricRegistry(metricMgr.registry(COMMUNICATION_METRICS_GROUP_NAME));
}
GridNioServer<Message> srvr = builder.build();
cfg.boundTcpPort(port);
// Ack Port the TCP server was bound to.
if (log.isInfoEnabled()) {
log.info("Successfully bound communication NIO server to TCP port " +
"[port=" + cfg.boundTcpPort() +
", locHost=" + cfg.localHost() +
", selectorsCnt=" + cfg.selectorsCount() +
", selectorSpins=" + srvr.selectorSpins() +
", pairedConn=" + cfg.usePairedConnections() + ']');
}
srvr.idleTimeout(cfg.idleConnectionTimeout());
return srvr;
}
catch (IgniteCheckedException e) {
if (X.hasCause(e, SSLException.class))
throw new IgniteSpiException("Failed to create SSL context. SSL factory: "
+ igniteCfg.getSslContextFactory() + '.', e);
lastEx = e;
if (log.isDebugEnabled())
log.debug("Failed to bind to local port (will try next port within range) [port=" + port +
", locHost=" + cfg.localHost() + ']');
eRegistrySupplier.get().onException("Failed to bind to local port (will try next port within range) [port=" + port +
", locHost=" + cfg.localHost() + ']', e);
}
}
// If free port wasn't found.
throw new IgniteCheckedException("Failed to bind to any port within range [startPort=" + cfg.localPort() +
", portRange=" + cfg.localPortRange() + ", locHost=" + cfg.localHost() + ']', lastEx);
}