in sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Connector.java [62:168]
public IoConnectFuture connect(
SocketAddress address, AttributeRepository context, SocketAddress localAddress) {
boolean debugEnabled = log.isDebugEnabled();
if (debugEnabled) {
log.debug("Connecting to {}", address);
}
IoConnectFuture future = new DefaultIoConnectFuture(address, null);
AsynchronousSocketChannel channel = null;
AsynchronousSocketChannel socket = null;
try {
AsynchronousChannelGroup group = getChannelGroup();
channel = openAsynchronousSocketChannel(address, group);
socket = setSocketOptions(channel);
if (localAddress != null) {
socket.bind(localAddress);
}
Nio2CompletionHandler<Void, Object> completionHandler = ValidateUtils.checkNotNull(
createConnectionCompletionHandler(
future, socket, context, propertyResolver, getIoHandler()),
"No connection completion handler created for %s",
address);
// With a completion handler there is no way to cancel an ongoing connection attempt. We could only let
// the attempt proceed to failure or success, and if successful, close the established channel again. With a
// future, we can cancel the future to abort the connection attempt, but we need to use our own thread pool
// for waiting on the future and invoking the completion handler.
Future<Void> cf = socket.connect(address);
Long connectTimeout = CoreModuleProperties.IO_CONNECT_TIMEOUT.get(propertyResolver).map(d -> {
if (d.isZero() || d.isNegative()) {
return null;
}
long millis;
try {
millis = d.toMillis();
} catch (ArithmeticException e) {
millis = Long.MAX_VALUE;
}
return Long.valueOf(millis);
}).orElse(null);
Future<?> rf = getExecutorService().submit(() -> {
try {
if (connectTimeout != null) {
log.debug("connect({}): waiting for connection (timeout={}ms)", address, connectTimeout);
cf.get(connectTimeout.longValue(), TimeUnit.MILLISECONDS);
} else {
log.debug("connect({}): waiting for connection", address);
cf.get();
}
completionHandler.onCompleted(null, null);
} catch (CancellationException e) {
CancelFuture cancellation = future.cancel();
if (cancellation != null) {
cancellation.setCanceled(e);
}
} catch (TimeoutException e) {
cf.cancel(true);
ConnectException c = new ConnectException("I/O connection time-out of " + connectTimeout + "ms expired");
c.initCause(e);
completionHandler.onFailed(c, null);
} catch (ExecutionException e) {
completionHandler.onFailed(e, null);
} catch (InterruptedException e) {
completionHandler.onFailed(e, null);
Thread.currentThread().interrupt();
}
});
future.addListener(f -> {
if (f.isCanceled()) {
// Don't interrupt if already running; if inside completionHandler.onCompleted() it might cause
// general confusion.
rf.cancel(false);
cf.cancel(true);
}
});
} catch (Throwable exc) {
Throwable t = ExceptionUtils.peelException(exc);
debug("connect({}) failed ({}) to schedule connection: {}",
address, t.getClass().getSimpleName(), t.getMessage(), t);
try {
if (socket != null) {
socket.close();
}
} catch (IOException err) {
if (debugEnabled) {
log.debug("connect({}) - failed ({}) to close socket: {}",
address, err.getClass().getSimpleName(), err.getMessage());
}
}
try {
if (channel != null) {
channel.close();
}
} catch (IOException err) {
if (debugEnabled) {
log.debug("connect({}) - failed ({}) to close channel: {}",
address, err.getClass().getSimpleName(), err.getMessage());
}
}
future.setException(t);
}
return future;
}