in runtime/executor/src/main/java/org/apache/nemo/runtime/executor/transfer/ByteTransport.java [81:176]
private ByteTransport(
final NameResolver nameResolver,
@Parameter(JobConf.ExecutorId.class) final String localExecutorId,
final NettyChannelImplementationSelector channelImplSelector,
final ByteTransportChannelInitializer channelInitializer,
final TcpPortProvider tcpPortProvider,
final LocalAddressProvider localAddressProvider,
@Parameter(JobConf.PartitionTransportServerPort.class) final int port,
@Parameter(JobConf.PartitionTransportServerBacklog.class) final int serverBacklog,
@Parameter(JobConf.PartitionTransportServerNumListeningThreads.class) final int numListeningThreads,
@Parameter(JobConf.PartitionTransportServerNumWorkingThreads.class) final int numWorkingThreads,
@Parameter(JobConf.PartitionTransportClientNumThreads.class) final int numClientThreads) {
this.nameResolver = nameResolver;
if (port < 0) {
throw new IllegalArgumentException(String.format("Invalid ByteTransportPort: %d", port));
}
final String host = localAddressProvider.getLocalAddress();
serverListeningGroup = channelImplSelector.newEventLoopGroup(numListeningThreads,
new DefaultThreadFactory(SERVER_LISTENING));
serverWorkingGroup = channelImplSelector.newEventLoopGroup(numWorkingThreads,
new DefaultThreadFactory(SERVER_WORKING));
clientGroup = channelImplSelector.newEventLoopGroup(numClientThreads, new DefaultThreadFactory(CLIENT));
clientBootstrap = new Bootstrap()
.group(clientGroup)
.channel(channelImplSelector.getChannelClass())
.handler(channelInitializer)
.option(ChannelOption.SO_REUSEADDR, true);
final ServerBootstrap serverBootstrap = new ServerBootstrap()
.group(serverListeningGroup, serverWorkingGroup)
.channel(channelImplSelector.getServerChannelClass())
.childHandler(channelInitializer)
.option(ChannelOption.SO_BACKLOG, serverBacklog)
.option(ChannelOption.SO_REUSEADDR, true);
Channel listeningChannel = null;
if (port == 0) {
for (final int candidatePort : tcpPortProvider) {
try {
final ChannelFuture future = serverBootstrap.bind(host, candidatePort).await();
if (future.cause() != null) {
LOG.warn("Cannot bind to {}:{} because {}", host, candidatePort, future.cause());
} else if (!future.isSuccess()) {
LOG.warn("Cannot bind to {}:{}", host, candidatePort);
} else {
listeningChannel = future.channel();
break;
}
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
LOG.debug(String.format("Interrupted while binding to %s:%d", host, candidatePort), e);
}
}
if (listeningChannel == null) {
serverListeningGroup.shutdownGracefully();
serverWorkingGroup.shutdownGracefully();
clientGroup.shutdownGracefully();
LOG.error("Cannot bind to {} with tcpPortProvider", host);
throw new RuntimeException(String.format("Cannot bind to %s with tcpPortProvider", host));
}
} else {
try {
final ChannelFuture future = serverBootstrap.bind(host, port).await();
if (future.cause() != null) {
throw future.cause();
} else if (!future.isSuccess()) {
throw new RuntimeException("Cannot bind");
} else {
listeningChannel = future.channel();
}
} catch (final Throwable e) {
serverListeningGroup.shutdownGracefully();
serverWorkingGroup.shutdownGracefully();
clientGroup.shutdownGracefully();
LOG.error(String.format("Cannot bind to %s:%d when bind ChannelFuture", host, port), e);
throw new RuntimeException(String.format("Cannot bind to %s:%d when connect ChannelFuture", host, port), e);
}
}
serverListeningChannel = listeningChannel;
try {
final ByteTransportIdentifier identifier = new ByteTransportIdentifier(localExecutorId);
nameResolver.register(identifier, (InetSocketAddress) listeningChannel.localAddress());
} catch (final Exception e) {
LOG.error("Cannot register ByteTransport listening address to the naming registry", e);
throw new RuntimeException(e);
}
LOG.info("ByteTransport server in {} is listening at {}", localExecutorId, listeningChannel.localAddress());
}