in sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoConnector.java [56:151]
public IoConnectFuture connect(SocketAddress address, AttributeRepository context, SocketAddress localAddress) {
if (log.isDebugEnabled()) {
log.debug("Connecting to {}", address);
}
IoConnectFuture future = new DefaultIoConnectFuture(address, null);
Bootstrap bootstrap = new Bootstrap().group(factory.eventLoopGroup)
.channel(NioSocketChannel.class)
.attr(CONNECT_FUTURE_KEY, future)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
@SuppressWarnings("synthetic-access")
protected void initChannel(SocketChannel ch) throws Exception {
IoServiceEventListener listener = getIoServiceEventListener();
SocketAddress local = ch.localAddress();
SocketAddress remote = ch.remoteAddress();
try {
if (listener != null) {
try {
listener.connectionEstablished(NettyIoConnector.this, local, context, remote);
} catch (Exception e) {
ch.close();
throw e;
}
}
@SuppressWarnings("resource")
NettyIoSession session = new NettyIoSession(NettyIoConnector.this, handler, null);
if (context != null) {
session.setAttribute(AttributeRepository.class, context);
}
ChannelPipeline p = ch.pipeline();
p.addLast(LOGGING_TRACE); // TODO make this configurable
p.addLast(session.adapter);
} catch (Exception e) {
if (listener != null) {
try {
listener.abortEstablishedConnection(NettyIoConnector.this, local, context, remote, e);
} catch (Exception exc) {
if (log.isDebugEnabled()) {
log.debug("initChannel(" + ch + ") listener=" + listener
+ " ignoring abort event exception",
exc);
}
}
}
throw e;
}
}
});
CoreModuleProperties.IO_CONNECT_TIMEOUT.get(factory.manager).ifPresent(d -> {
if (d.isZero() || d.isNegative()) {
return;
}
long millis;
try {
millis = d.toMillis();
} catch (ArithmeticException e) {
millis = Integer.MAX_VALUE;
}
if (millis > Integer.MAX_VALUE) {
millis = Integer.MAX_VALUE;
}
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf((int) millis));
});
ChannelFuture chf;
if (localAddress != null) {
chf = bootstrap.connect(address, localAddress);
} else {
chf = bootstrap.connect(address);
}
future.addListener(f -> {
if (f.isCanceled()) {
if (chf.cancel(true) || chf.isCancelled()) {
f.getCancellation().setCanceled();
}
}
});
chf.addListener(cf -> {
Throwable t = cf.cause();
if (t != null) {
future.setException(t);
} else if (cf.isCancelled()) {
CancelFuture cancellation = future.cancel();
if (cancellation != null) {
cancellation.setCanceled();
}
}
});
// The future is completed when the session gets a channelActivated event.
return future;
}