in nailgun-server/src/main/java/com/facebook/nailgun/NGCommunicator.java [184:266]
private void startBackgroundReceive() {
// Read timeout, including heartbeats, should be handled by socket.
// However Java Socket/Stream API does not enforce that. To stay on safer side,
// use timeout on a future
// let socket timeout first, set rough timeout to 110% of original
long futureTimeout = heartbeatTimeoutMillis + heartbeatTimeoutMillis / 10;
orchestratorExecutor.submit(
() -> {
NGClientDisconnectReason reason = NGClientDisconnectReason.INTERNAL_ERROR;
try {
LOG.log(Level.FINE, "Orchestrator thread started");
while (true) {
Future<Byte> readFuture;
synchronized (orchestratorEvent) {
if (shutdown) {
break;
}
readFuture =
readExecutor.submit(
() -> {
try {
return readChunk();
} catch (IOException e) {
throw new ExecutionException(e);
}
});
}
byte chunkType =
futureTimeout > 0
? readFuture.get(futureTimeout, TimeUnit.MILLISECONDS)
: readFuture.get();
if (chunkType == NGConstants.CHUNKTYPE_HEARTBEAT) {
notifyHeartbeat();
}
}
} catch (InterruptedException e) {
LOG.log(Level.WARNING, "NGCommunicator orchestrator was interrupted", e);
} catch (ExecutionException e) {
Throwable cause = getCause(e);
if (cause instanceof EOFException) {
// DataInputStream throws EOFException if stream is terminated
// just do nothing and exit main orchestrator thread loop
LOG.log(Level.FINE, "Socket is disconnected");
reason = NGClientDisconnectReason.SOCKET_ERROR;
} else if (cause instanceof SocketTimeoutException) {
reason = NGClientDisconnectReason.SOCKET_TIMEOUT;
LOG.log(
Level.WARNING,
"Nailgun client socket timed out after " + heartbeatTimeoutMillis + " ms",
cause);
} else {
LOG.log(Level.WARNING, "Nailgun client read future raised an exception", cause);
}
} catch (TimeoutException e) {
reason = NGClientDisconnectReason.HEARTBEAT;
LOG.log(
Level.WARNING,
"Nailgun client read future timed out after " + futureTimeout + " ms",
e);
} catch (Throwable e) {
LOG.log(Level.WARNING, "Nailgun orchestrator gets an exception ", e);
}
LOG.log(Level.FINE, "Nailgun client disconnected");
// set client disconnected flag
clientConnected.set(false);
// notify stream readers there will be no more data
setEof();
// keep orchestrator thread running until signalled to shut up from close()
// it is still responsible to notify about client disconnects if listener is
// attached after disconnect had really happened
waitTerminationAndNotifyClients(reason);
LOG.log(Level.FINE, "Orchestrator thread finished");
});
}