private TransportClient internalCreateClient()

in common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java [269:375]


  private TransportClient internalCreateClient(
      InetSocketAddress address, ChannelInboundHandlerAdapter decoder)
      throws IOException, InterruptedException {
    Bootstrap bootstrap = new Bootstrap();
    bootstrap
        .group(workerGroup)
        .channel(socketChannelClass)
        // Disable Nagle's Algorithm since we don't want packets to wait
        .option(ChannelOption.TCP_NODELAY, true)
        .option(ChannelOption.SO_KEEPALIVE, true)
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMs)
        .option(ChannelOption.ALLOCATOR, allocator);

    if (receiveBuf > 0) {
      bootstrap.option(ChannelOption.SO_RCVBUF, receiveBuf);
    }

    if (sendBuf > 0) {
      bootstrap.option(ChannelOption.SO_SNDBUF, sendBuf);
    }

    final AtomicReference<TransportClient> clientRef = new AtomicReference<>();
    final AtomicReference<Channel> channelRef = new AtomicReference<>();

    bootstrap.handler(
        new ChannelInitializer<SocketChannel>() {
          @Override
          public void initChannel(SocketChannel ch) {
            TransportChannelHandler clientHandler = context.initializePipeline(ch, decoder, true);
            clientRef.set(clientHandler.getClient());
            channelRef.set(ch);
          }
        });

    // Connect to the remote server
    long preConnect = System.nanoTime();
    ChannelFuture cf = bootstrap.connect(address);
    if (connectTimeoutMs <= 0) {
      cf.await();
      assert cf.isDone();
      if (cf.isCancelled()) {
        throw new IOException(String.format("Connecting to %s cancelled", address));
      } else if (!cf.isSuccess()) {
        throw new IOException(String.format("Failed to connect to %s", address), cf.cause());
      }
    } else if (!cf.await(connectTimeoutMs)) {
      throw new CelebornIOException(
          String.format("Connecting to %s timed out (%s ms)", address, connectTimeoutMs));
    } else if (cf.cause() != null) {
      throw new CelebornIOException(String.format("Failed to connect to %s", address), cf.cause());
    }
    if (context.sslEncryptionEnabled()) {
      final SslHandler sslHandler = cf.channel().pipeline().get(SslHandler.class);
      sslHandler.setHandshakeTimeoutMillis(sslHandshakeTimeoutMs);
      Future<Channel> future =
          sslHandler
              .handshakeFuture()
              .addListener(
                  new GenericFutureListener<Future<Channel>>() {
                    @Override
                    public void operationComplete(final Future<Channel> handshakeFuture) {
                      if (handshakeFuture.isSuccess()) {
                        logger.debug("successfully completed TLS handshake to {}", address);
                      } else {
                        logger.info(
                            "failed to complete TLS handshake to {}",
                            address,
                            handshakeFuture.cause());
                        cf.channel().close();
                      }
                    }
                  });
      if (!future.await(connectionTimeoutMs)) {
        cf.channel().close();
        throw new IOException(
            String.format("Failed to connect to %s within connection timeout", address));
      }
    }

    TransportClient client = clientRef.get();
    assert client != null : "Channel future completed successfully with null client";

    // Execute any client bootstraps synchronously before marking the Client as successful.
    long preBootstrap = System.nanoTime();
    logger.debug("Running bootstraps for {} ...", address);
    try {
      for (TransportClientBootstrap clientBootstrap : clientBootstraps) {
        clientBootstrap.doBootstrap(client);
      }
    } catch (Exception e) { // catch non-RuntimeExceptions too as bootstrap may be written in Scala
      long bootstrapTime = System.nanoTime() - preBootstrap;
      logger.error(
          "Exception while bootstrapping client after {}",
          Utils.nanoDurationToString(bootstrapTime),
          e);
      client.close();
      throw Throwables.propagate(e);
    }
    long postBootstrap = System.nanoTime();
    logger.debug(
        "Successfully created connection to {} after {} ({} spent in bootstraps)",
        address,
        Utils.nanoDurationToString(postBootstrap - preConnect),
        Utils.nanoDurationToString(postBootstrap - preBootstrap));

    return client;
  }