in common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java [269:375]
private TransportClient internalCreateClient(
InetSocketAddress address, ChannelInboundHandlerAdapter decoder)
throws IOException, InterruptedException {
Bootstrap bootstrap = new Bootstrap();
bootstrap
.group(workerGroup)
.channel(socketChannelClass)
// Disable Nagle's Algorithm since we don't want packets to wait
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMs)
.option(ChannelOption.ALLOCATOR, allocator);
if (receiveBuf > 0) {
bootstrap.option(ChannelOption.SO_RCVBUF, receiveBuf);
}
if (sendBuf > 0) {
bootstrap.option(ChannelOption.SO_SNDBUF, sendBuf);
}
final AtomicReference<TransportClient> clientRef = new AtomicReference<>();
final AtomicReference<Channel> channelRef = new AtomicReference<>();
bootstrap.handler(
new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
TransportChannelHandler clientHandler = context.initializePipeline(ch, decoder, true);
clientRef.set(clientHandler.getClient());
channelRef.set(ch);
}
});
// Connect to the remote server
long preConnect = System.nanoTime();
ChannelFuture cf = bootstrap.connect(address);
if (connectTimeoutMs <= 0) {
cf.await();
assert cf.isDone();
if (cf.isCancelled()) {
throw new IOException(String.format("Connecting to %s cancelled", address));
} else if (!cf.isSuccess()) {
throw new IOException(String.format("Failed to connect to %s", address), cf.cause());
}
} else if (!cf.await(connectTimeoutMs)) {
throw new CelebornIOException(
String.format("Connecting to %s timed out (%s ms)", address, connectTimeoutMs));
} else if (cf.cause() != null) {
throw new CelebornIOException(String.format("Failed to connect to %s", address), cf.cause());
}
if (context.sslEncryptionEnabled()) {
final SslHandler sslHandler = cf.channel().pipeline().get(SslHandler.class);
sslHandler.setHandshakeTimeoutMillis(sslHandshakeTimeoutMs);
Future<Channel> future =
sslHandler
.handshakeFuture()
.addListener(
new GenericFutureListener<Future<Channel>>() {
@Override
public void operationComplete(final Future<Channel> handshakeFuture) {
if (handshakeFuture.isSuccess()) {
logger.debug("successfully completed TLS handshake to {}", address);
} else {
logger.info(
"failed to complete TLS handshake to {}",
address,
handshakeFuture.cause());
cf.channel().close();
}
}
});
if (!future.await(connectionTimeoutMs)) {
cf.channel().close();
throw new IOException(
String.format("Failed to connect to %s within connection timeout", address));
}
}
TransportClient client = clientRef.get();
assert client != null : "Channel future completed successfully with null client";
// Execute any client bootstraps synchronously before marking the Client as successful.
long preBootstrap = System.nanoTime();
logger.debug("Running bootstraps for {} ...", address);
try {
for (TransportClientBootstrap clientBootstrap : clientBootstraps) {
clientBootstrap.doBootstrap(client);
}
} catch (Exception e) { // catch non-RuntimeExceptions too as bootstrap may be written in Scala
long bootstrapTime = System.nanoTime() - preBootstrap;
logger.error(
"Exception while bootstrapping client after {}",
Utils.nanoDurationToString(bootstrapTime),
e);
client.close();
throw Throwables.propagate(e);
}
long postBootstrap = System.nanoTime();
logger.debug(
"Successfully created connection to {} after {} ({} spent in bootstraps)",
address,
Utils.nanoDurationToString(postBootstrap - preConnect),
Utils.nanoDurationToString(postBootstrap - preBootstrap));
return client;
}