in nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java [96:317]
public void start() throws IOException {
final boolean secure = (sslContext != null);
final List<Thread> threads = new ArrayList<>();
stopped.set(false);
final Thread listenerThread = new Thread(new Runnable() {
private int threadCount = 0;
@Override
public void run() {
try (final ServerSocket serverSocket = createServerSocket()) {
serverSocket.setSoTimeout(2000);
while (!stopped.get()) {
final Socket acceptedSocket = acceptConnection(serverSocket);
if (acceptedSocket == null) {
continue;
}
if (stopped.get()) {
break;
}
final Thread thread = createWorkerThread(acceptedSocket);
thread.setName("Site-to-Site Worker Thread-" + (threadCount++));
LOG.debug("Handing connection to {}", thread);
thread.start();
threads.add(thread);
threads.removeIf(t -> !t.isAlive());
}
} catch (final IOException e) {
LOG.error("Unable to open server socket", e);
}
for (Thread thread : threads) {
if (thread != null) {
thread.interrupt();
}
}
}
private Thread createWorkerThread(Socket socket) {
return new Thread(new Runnable() {
@Override
public void run() {
LOG.debug("{} Determining URL of connection", this);
final InetAddress inetAddress = socket.getInetAddress();
String clientHostName = inetAddress.getHostName();
final int slashIndex = clientHostName.indexOf("/");
if (slashIndex == 0) {
clientHostName = clientHostName.substring(1);
} else if (slashIndex > 0) {
clientHostName = clientHostName.substring(0, slashIndex);
}
final int clientPort = socket.getPort();
final String peerUri = "nifi://" + clientHostName + ":" + clientPort;
LOG.debug("{} Connection URL is {}", this, peerUri);
final CommunicationsSession commsSession;
final String dn;
try {
if (secure) {
LOG.trace("{} Connection is secure", this);
final SSLSocket sslSocket = (SSLSocket) socket;
dn = getPeerIdentity(sslSocket);
commsSession = new SocketCommunicationsSession(socket);
commsSession.setUserDn(dn);
} else {
LOG.trace("{} Connection is not secure", this);
commsSession = new SocketCommunicationsSession(socket);
dn = null;
}
} catch (final Exception e) {
// TODO: Add SocketProtocolListener#handleTlsError logic here
String msg = String.format("RemoteSiteListener Unable to accept connection from %s due to %s", socket, e.getLocalizedMessage());
// Suppress repeated TLS errors
if (isTlsError(e)) {
boolean printedAsWarning = handleTlsError(msg);
// TODO: Move into handleTlsError and refactor shared behavior
// If the error was printed as a warning, reset the last seen timer
if (printedAsWarning) {
tlsErrorLastSeen = System.currentTimeMillis();
}
} else {
LOG.error(msg);
if (LOG.isDebugEnabled()) {
LOG.error("", e);
}
}
return;
}
LOG.info("Received connection from {}, User DN: {}", socket.getInetAddress(), dn);
final InputStream socketIn;
final OutputStream socketOut;
try {
socketIn = commsSession.getInput().getInputStream();
socketOut = commsSession.getOutput().getOutputStream();
} catch (final IOException e) {
LOG.error("Connection dropped from {} before any data was transmitted", peerUri);
try {
commsSession.close();
} catch (final IOException ignored) {
}
return;
}
final DataInputStream dis = new DataInputStream(socketIn);
final DataOutputStream dos = new DataOutputStream(socketOut);
ServerProtocol protocol = null;
Peer peer = null;
try {
// ensure that we are communicating with another NiFi
LOG.debug("Verifying magic bytes...");
verifyMagicBytes(dis, peerUri);
LOG.debug("Receiving Server Protocol Negotiation");
protocol = RemoteResourceFactory.receiveServerProtocolNegotiation(dis, dos);
protocol.setRootProcessGroup(rootGroup.get());
protocol.setNodeInformant(nodeInformant);
if (protocol instanceof PeerDescriptionModifiable) {
((PeerDescriptionModifiable) protocol).setPeerDescriptionModifier(peerDescriptionModifier);
}
final PeerDescription description = new PeerDescription(clientHostName, clientPort, sslContext != null);
peer = new Peer(description, commsSession, peerUri, "nifi://localhost:" + getPort());
LOG.debug("Handshaking....");
protocol.handshake(peer);
if (!protocol.isHandshakeSuccessful()) {
LOG.error("Handshake failed with {}; closing connection", peer);
try {
peer.close();
} catch (final IOException e) {
LOG.warn("Failed to close {} due to {}", peer, e);
}
// no need to shutdown protocol because we failed to perform handshake
return;
}
commsSession.setTimeout((int) protocol.getRequestExpiration());
LOG.info("Successfully negotiated ServerProtocol {} Version {} with {}",
protocol.getResourceName(), protocol.getVersionNegotiator().getVersion(), peer);
try {
while (!protocol.isShutdown()) {
LOG.trace("Getting Protocol Request Type...");
int timeoutCount = 0;
RequestType requestType = null;
while (requestType == null) {
try {
requestType = protocol.getRequestType(peer);
} catch (final SocketTimeoutException e) {
// Give the timeout a bit longer (twice as long) to receive the Request Type,
// in order to attempt to receive more data without shutting down the socket if we don't
// have to.
LOG.debug("{} Timed out waiting to receive RequestType using {} with {}", this, protocol, peer);
timeoutCount++;
requestType = null;
if (timeoutCount >= 2) {
throw e;
}
}
}
handleRequest(protocol, peer, requestType);
}
LOG.debug("Finished communicating with {} ({})", peer, protocol);
} catch (final Exception e) {
LOG.error("Unable to communicate with remote instance {} ({}); closing connection", peer, protocol, e);
}
} catch (final IOException e) {
LOG.error("Unable to communicate with remote instance {}; closing connection", peer, e);
} catch (final Throwable t) {
LOG.error("Handshake failed when communicating with {}; closing connection.", peerUri, t);
} finally {
LOG.trace("Cleaning up");
try {
if (protocol != null && peer != null) {
protocol.shutdown(peer);
}
} catch (final Exception protocolException) {
LOG.warn("Failed to shutdown protocol", protocolException);
}
try {
if (peer != null) {
peer.close();
}
} catch (final Exception peerException) {
LOG.warn("Failed to close peer; some resources may not be appropriately cleaned up", peerException);
}
LOG.trace("Finished cleaning up");
}
}
});
}
});
listenerThread.setName("Site-to-Site Listener");
listenerThread.start();
}