private void startBackgroundReceive()

in nailgun-server/src/main/java/com/facebook/nailgun/NGCommunicator.java [184:266]


  private void startBackgroundReceive() {
    // Read timeout, including heartbeats, should be handled by socket.
    // However Java Socket/Stream API does not enforce that. To stay on safer side,
    // use timeout on a future

    // let socket timeout first, set rough timeout to 110% of original
    long futureTimeout = heartbeatTimeoutMillis + heartbeatTimeoutMillis / 10;

    orchestratorExecutor.submit(
        () -> {
          NGClientDisconnectReason reason = NGClientDisconnectReason.INTERNAL_ERROR;
          try {
            LOG.log(Level.FINE, "Orchestrator thread started");
            while (true) {
              Future<Byte> readFuture;
              synchronized (orchestratorEvent) {
                if (shutdown) {
                  break;
                }
                readFuture =
                    readExecutor.submit(
                        () -> {
                          try {
                            return readChunk();
                          } catch (IOException e) {
                            throw new ExecutionException(e);
                          }
                        });
              }

              byte chunkType =
                  futureTimeout > 0
                      ? readFuture.get(futureTimeout, TimeUnit.MILLISECONDS)
                      : readFuture.get();

              if (chunkType == NGConstants.CHUNKTYPE_HEARTBEAT) {
                notifyHeartbeat();
              }
            }
          } catch (InterruptedException e) {
            LOG.log(Level.WARNING, "NGCommunicator orchestrator was interrupted", e);
          } catch (ExecutionException e) {
            Throwable cause = getCause(e);
            if (cause instanceof EOFException) {
              // DataInputStream throws EOFException if stream is terminated
              // just do nothing and exit main orchestrator thread loop
              LOG.log(Level.FINE, "Socket is disconnected");
              reason = NGClientDisconnectReason.SOCKET_ERROR;
            } else if (cause instanceof SocketTimeoutException) {
              reason = NGClientDisconnectReason.SOCKET_TIMEOUT;
              LOG.log(
                  Level.WARNING,
                  "Nailgun client socket timed out after " + heartbeatTimeoutMillis + " ms",
                  cause);
            } else {
              LOG.log(Level.WARNING, "Nailgun client read future raised an exception", cause);
            }
          } catch (TimeoutException e) {
            reason = NGClientDisconnectReason.HEARTBEAT;
            LOG.log(
                Level.WARNING,
                "Nailgun client read future timed out after " + futureTimeout + " ms",
                e);
          } catch (Throwable e) {
            LOG.log(Level.WARNING, "Nailgun orchestrator gets an exception ", e);
          }

          LOG.log(Level.FINE, "Nailgun client disconnected");

          // set client disconnected flag
          clientConnected.set(false);

          // notify stream readers there will be no more data
          setEof();

          // keep orchestrator thread running until signalled to shut up from close()
          // it is still responsible to notify about client disconnects if listener is
          // attached after disconnect had really happened
          waitTerminationAndNotifyClients(reason);

          LOG.log(Level.FINE, "Orchestrator thread finished");
        });
  }