public void start()

in nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java [96:317]


    public void start() throws IOException {
        final boolean secure = (sslContext != null);
        final List<Thread> threads = new ArrayList<>();

        stopped.set(false);

        final Thread listenerThread = new Thread(new Runnable() {
            private int threadCount = 0;

            @Override
            public void run() {

                try (final ServerSocket serverSocket = createServerSocket()) {
                    serverSocket.setSoTimeout(2000);

                    while (!stopped.get()) {

                        final Socket acceptedSocket = acceptConnection(serverSocket);

                        if (acceptedSocket == null) {
                            continue;
                        }

                        if (stopped.get()) {
                            break;
                        }

                        final Thread thread = createWorkerThread(acceptedSocket);
                        thread.setName("Site-to-Site Worker Thread-" + (threadCount++));
                        LOG.debug("Handing connection to {}", thread);
                        thread.start();
                        threads.add(thread);
                        threads.removeIf(t -> !t.isAlive());

                    }

                } catch (final IOException e) {
                    LOG.error("Unable to open server socket", e);
                }


                for (Thread thread : threads) {
                    if (thread != null) {
                        thread.interrupt();
                    }
                }
            }

            private Thread createWorkerThread(Socket socket) {
                return new Thread(new Runnable() {
                    @Override
                    public void run() {
                        LOG.debug("{} Determining URL of connection", this);
                        final InetAddress inetAddress = socket.getInetAddress();
                        String clientHostName = inetAddress.getHostName();
                        final int slashIndex = clientHostName.indexOf("/");
                        if (slashIndex == 0) {
                            clientHostName = clientHostName.substring(1);
                        } else if (slashIndex > 0) {
                            clientHostName = clientHostName.substring(0, slashIndex);
                        }

                        final int clientPort = socket.getPort();
                        final String peerUri = "nifi://" + clientHostName + ":" + clientPort;
                        LOG.debug("{} Connection URL is {}", this, peerUri);

                        final CommunicationsSession commsSession;
                        final String dn;
                        try {
                            if (secure) {
                                LOG.trace("{} Connection is secure", this);
                                final SSLSocket sslSocket = (SSLSocket) socket;
                                dn = getPeerIdentity(sslSocket);

                                commsSession = new SocketCommunicationsSession(socket);
                                commsSession.setUserDn(dn);

                            } else {
                                LOG.trace("{} Connection is not secure", this);
                                commsSession = new SocketCommunicationsSession(socket);
                                dn = null;
                            }
                        } catch (final Exception e) {
                            // TODO: Add SocketProtocolListener#handleTlsError logic here
                            String msg = String.format("RemoteSiteListener Unable to accept connection from %s due to %s", socket, e.getLocalizedMessage());
                            // Suppress repeated TLS errors
                            if (isTlsError(e)) {
                                boolean printedAsWarning = handleTlsError(msg);

                                // TODO: Move into handleTlsError and refactor shared behavior
                                // If the error was printed as a warning, reset the last seen timer
                                if (printedAsWarning) {
                                    tlsErrorLastSeen = System.currentTimeMillis();
                                }
                            } else {
                                LOG.error(msg);
                                if (LOG.isDebugEnabled()) {
                                    LOG.error("", e);
                                }
                            }
                            return;
                        }

                        LOG.info("Received connection from {}, User DN: {}", socket.getInetAddress(), dn);

                        final InputStream socketIn;
                        final OutputStream socketOut;

                        try {
                            socketIn = commsSession.getInput().getInputStream();
                            socketOut = commsSession.getOutput().getOutputStream();
                        } catch (final IOException e) {
                            LOG.error("Connection dropped from {} before any data was transmitted", peerUri);
                            try {
                                commsSession.close();
                            } catch (final IOException ignored) {
                            }

                            return;
                        }

                        final DataInputStream dis = new DataInputStream(socketIn);
                        final DataOutputStream dos = new DataOutputStream(socketOut);

                        ServerProtocol protocol = null;
                        Peer peer = null;
                        try {
                            // ensure that we are communicating with another NiFi
                            LOG.debug("Verifying magic bytes...");
                            verifyMagicBytes(dis, peerUri);

                            LOG.debug("Receiving Server Protocol Negotiation");
                            protocol = RemoteResourceFactory.receiveServerProtocolNegotiation(dis, dos);
                            protocol.setRootProcessGroup(rootGroup.get());
                            protocol.setNodeInformant(nodeInformant);
                            if (protocol instanceof PeerDescriptionModifiable) {
                                ((PeerDescriptionModifiable) protocol).setPeerDescriptionModifier(peerDescriptionModifier);
                            }

                            final PeerDescription description = new PeerDescription(clientHostName, clientPort, sslContext != null);
                            peer = new Peer(description, commsSession, peerUri, "nifi://localhost:" + getPort());
                            LOG.debug("Handshaking....");
                            protocol.handshake(peer);

                            if (!protocol.isHandshakeSuccessful()) {
                                LOG.error("Handshake failed with {}; closing connection", peer);
                                try {
                                    peer.close();
                                } catch (final IOException e) {
                                    LOG.warn("Failed to close {} due to {}", peer, e);
                                }

                                // no need to shutdown protocol because we failed to perform handshake
                                return;
                            }

                            commsSession.setTimeout((int) protocol.getRequestExpiration());

                            LOG.info("Successfully negotiated ServerProtocol {} Version {} with {}",
                                protocol.getResourceName(), protocol.getVersionNegotiator().getVersion(), peer);

                            try {
                                while (!protocol.isShutdown()) {
                                    LOG.trace("Getting Protocol Request Type...");

                                    int timeoutCount = 0;
                                    RequestType requestType = null;

                                    while (requestType == null) {
                                        try {
                                            requestType = protocol.getRequestType(peer);
                                        } catch (final SocketTimeoutException e) {
                                            // Give the timeout a bit longer (twice as long) to receive the Request Type,
                                            // in order to attempt to receive more data without shutting down the socket if we don't
                                            // have to.
                                            LOG.debug("{} Timed out waiting to receive RequestType using {} with {}", this, protocol, peer);
                                            timeoutCount++;
                                            requestType = null;

                                            if (timeoutCount >= 2) {
                                                throw e;
                                            }
                                        }
                                    }

                                    handleRequest(protocol, peer, requestType);
                                }
                                LOG.debug("Finished communicating with {} ({})", peer, protocol);
                            } catch (final Exception e) {
                                LOG.error("Unable to communicate with remote instance {} ({}); closing connection", peer, protocol, e);
                            }
                        } catch (final IOException e) {
                            LOG.error("Unable to communicate with remote instance {}; closing connection", peer, e);
                        } catch (final Throwable t) {
                            LOG.error("Handshake failed when communicating with {}; closing connection.", peerUri, t);
                        } finally {
                            LOG.trace("Cleaning up");
                            try {
                                if (protocol != null && peer != null) {
                                    protocol.shutdown(peer);
                                }
                            } catch (final Exception protocolException) {
                                LOG.warn("Failed to shutdown protocol", protocolException);
                            }

                            try {
                                if (peer != null) {
                                    peer.close();
                                }
                            } catch (final Exception peerException) {
                                LOG.warn("Failed to close peer; some resources may not be appropriately cleaned up", peerException);
                            }
                            LOG.trace("Finished cleaning up");
                        }
                    }
                });
            }
        });

        listenerThread.setName("Site-to-Site Listener");
        listenerThread.start();
    }