private ByteTransport()

in runtime/executor/src/main/java/org/apache/nemo/runtime/executor/transfer/ByteTransport.java [81:176]


  private ByteTransport(
    final NameResolver nameResolver,
    @Parameter(JobConf.ExecutorId.class) final String localExecutorId,
    final NettyChannelImplementationSelector channelImplSelector,
    final ByteTransportChannelInitializer channelInitializer,
    final TcpPortProvider tcpPortProvider,
    final LocalAddressProvider localAddressProvider,
    @Parameter(JobConf.PartitionTransportServerPort.class) final int port,
    @Parameter(JobConf.PartitionTransportServerBacklog.class) final int serverBacklog,
    @Parameter(JobConf.PartitionTransportServerNumListeningThreads.class) final int numListeningThreads,
    @Parameter(JobConf.PartitionTransportServerNumWorkingThreads.class) final int numWorkingThreads,
    @Parameter(JobConf.PartitionTransportClientNumThreads.class) final int numClientThreads) {

    this.nameResolver = nameResolver;

    if (port < 0) {
      throw new IllegalArgumentException(String.format("Invalid ByteTransportPort: %d", port));
    }

    final String host = localAddressProvider.getLocalAddress();

    serverListeningGroup = channelImplSelector.newEventLoopGroup(numListeningThreads,
      new DefaultThreadFactory(SERVER_LISTENING));
    serverWorkingGroup = channelImplSelector.newEventLoopGroup(numWorkingThreads,
      new DefaultThreadFactory(SERVER_WORKING));
    clientGroup = channelImplSelector.newEventLoopGroup(numClientThreads, new DefaultThreadFactory(CLIENT));

    clientBootstrap = new Bootstrap()
      .group(clientGroup)
      .channel(channelImplSelector.getChannelClass())
      .handler(channelInitializer)
      .option(ChannelOption.SO_REUSEADDR, true);

    final ServerBootstrap serverBootstrap = new ServerBootstrap()
      .group(serverListeningGroup, serverWorkingGroup)
      .channel(channelImplSelector.getServerChannelClass())
      .childHandler(channelInitializer)
      .option(ChannelOption.SO_BACKLOG, serverBacklog)
      .option(ChannelOption.SO_REUSEADDR, true);

    Channel listeningChannel = null;
    if (port == 0) {
      for (final int candidatePort : tcpPortProvider) {
        try {
          final ChannelFuture future = serverBootstrap.bind(host, candidatePort).await();
          if (future.cause() != null) {
            LOG.warn("Cannot bind to {}:{} because {}", host, candidatePort, future.cause());
          } else if (!future.isSuccess()) {
            LOG.warn("Cannot bind to {}:{}", host, candidatePort);
          } else {
            listeningChannel = future.channel();
            break;
          }
        } catch (final InterruptedException e) {
          Thread.currentThread().interrupt();
          LOG.debug(String.format("Interrupted while binding to %s:%d", host, candidatePort), e);
        }
      }
      if (listeningChannel == null) {
        serverListeningGroup.shutdownGracefully();
        serverWorkingGroup.shutdownGracefully();
        clientGroup.shutdownGracefully();
        LOG.error("Cannot bind to {} with tcpPortProvider", host);
        throw new RuntimeException(String.format("Cannot bind to %s with tcpPortProvider", host));
      }
    } else {
      try {
        final ChannelFuture future = serverBootstrap.bind(host, port).await();
        if (future.cause() != null) {
          throw future.cause();
        } else if (!future.isSuccess()) {
          throw new RuntimeException("Cannot bind");
        } else {
          listeningChannel = future.channel();
        }
      } catch (final Throwable e) {
        serverListeningGroup.shutdownGracefully();
        serverWorkingGroup.shutdownGracefully();
        clientGroup.shutdownGracefully();
        LOG.error(String.format("Cannot bind to %s:%d when bind ChannelFuture", host, port), e);
        throw new RuntimeException(String.format("Cannot bind to %s:%d when connect ChannelFuture", host, port), e);
      }
    }

    serverListeningChannel = listeningChannel;

    try {
      final ByteTransportIdentifier identifier = new ByteTransportIdentifier(localExecutorId);
      nameResolver.register(identifier, (InetSocketAddress) listeningChannel.localAddress());
    } catch (final Exception e) {
      LOG.error("Cannot register ByteTransport listening address to the naming registry", e);
      throw new RuntimeException(e);
    }

    LOG.info("ByteTransport server in {} is listening at {}", localExecutorId, listeningChannel.localAddress());
  }