private FlowController()

in nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java [473:825]


    private FlowController(
            final FlowFileEventRepository flowFileEventRepo,
            final SSLContext sslContext,
            final NiFiProperties nifiProperties,
            final Authorizer authorizer,
            final AuditService auditService,
            final PropertyEncryptor encryptor,
            final boolean configuredForClustering,
            final NodeProtocolSender protocolSender,
            final BulletinRepository bulletinRepo,
            final ClusterCoordinator clusterCoordinator,
            final HeartbeatMonitor heartbeatMonitor,
            final LeaderElectionManager leaderElectionManager,
            final ExtensionDiscoveringManager extensionManager,
            final RevisionManager revisionManager,
            final StatusHistoryRepository statusHistoryRepository,
            final RuleViolationsManager ruleViolationsManager,
            final StateManagerProvider stateManagerProvider
    ) {

        maxTimerDrivenThreads = new AtomicInteger(10);

        this.encryptor = encryptor;
        this.nifiProperties = nifiProperties;
        this.heartbeatMonitor = heartbeatMonitor;
        this.leaderElectionManager = leaderElectionManager;
        this.extensionManager = extensionManager;
        this.clusterCoordinator = clusterCoordinator;
        this.authorizer = authorizer;
        this.auditService = auditService;
        this.configuredForClustering = configuredForClustering;
        this.revisionManager = revisionManager;
        this.statusHistoryRepository = statusHistoryRepository;
        this.stateManagerProvider = stateManagerProvider;

        timerDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxTimerDrivenThreads.get(), "Timer-Driven Process"));

        final FlowFileRepository flowFileRepo = createFlowFileRepository(nifiProperties, extensionManager, resourceClaimManager);
        flowFileRepository = flowFileRepo;
        flowFileEventRepository = flowFileEventRepo;
        counterRepositoryRef = new AtomicReference<>(new StandardCounterRepository());

        gcLog = new RingBufferGarbageCollectionLog(1000, 20L);
        for (final GarbageCollectorMXBean mxBean : ManagementFactory.getGarbageCollectorMXBeans()) {
            if (mxBean instanceof NotificationEmitter) {
                ((NotificationEmitter) mxBean).addNotificationListener(gcLog, null, null);
            }
        }

        bulletinRepository = bulletinRepo;

        try {
            this.provenanceAuthorizableFactory = new StandardProvenanceAuthorizableFactory(this);
            this.provenanceRepository = createProvenanceRepository(nifiProperties);

            final IdentifierLookup identifierLookup = new ComponentIdentifierLookup(this);

            this.provenanceRepository.initialize(createEventReporter(), authorizer, provenanceAuthorizableFactory, identifierLookup);
        } catch (final Exception e) {
            throw new RuntimeException("Unable to create Provenance Repository", e);
        }

        try {
            this.contentRepository = createContentRepository(nifiProperties);
        } catch (final Exception e) {
            throw new RuntimeException("Unable to create Content Repository", e);
        }

        lifecycleStateManager = new StandardLifecycleStateManager();
        processScheduler = new StandardProcessScheduler(timerDrivenEngineRef.get(), this, stateManagerProvider, this.nifiProperties, lifecycleStateManager);

        parameterContextManager = new StandardParameterContextManager();
        final long maxAppendableBytes = getMaxAppendableBytes();
        repositoryContextFactory = new RepositoryContextFactory(contentRepository, flowFileRepository, flowFileEventRepository,
            counterRepositoryRef.get(), provenanceRepository, stateManagerProvider, maxAppendableBytes);
        assetManager = createAssetManager(nifiProperties);

        this.flowAnalysisThreadPool = new FlowEngine(1, "Background Flow Analysis", true);
        if (ruleViolationsManager != null) {
            flowAnalyzer = new StandardFlowAnalyzer(
                ruleViolationsManager,
                this,
                extensionManager
            );
        } else {
            flowAnalyzer = null;
        }

        flowManager = new StandardFlowManager(
                nifiProperties,
                sslContext,
                this,
                flowFileEventRepository,
                parameterContextManager
        );

        controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, flowManager, extensionManager);
        controllerServiceResolver = new StandardControllerServiceResolver(authorizer, flowManager, new NiFiRegistryFlowMapper(extensionManager),
                controllerServiceProvider, new StandardControllerServiceApiLookup(extensionManager));

        final PythonBridge rawPythonBridge = createPythonBridge(nifiProperties, controllerServiceProvider);
        final ClassLoader pythonBridgeClassLoader = rawPythonBridge.getClass().getClassLoader();
        final PythonBridge classloaderAwareBridge = new ClassLoaderAwarePythonBridge(rawPythonBridge, pythonBridgeClassLoader);
        this.pythonBridge = classloaderAwareBridge;

        try {
            pythonBridge.start();
        } catch (final IOException e) {
            throw new IllegalStateException("Failed to communicate with Python Controller", e);
        }
        extensionManager.setPythonBridge(pythonBridge);
        pythonBundle = PythonBundle.create(nifiProperties, pythonBridgeClassLoader);
        extensionManager.discoverPythonExtensions(pythonBundle);

        flowManager.initialize(
                controllerServiceProvider,
                pythonBridge,
                flowAnalyzer,
                ruleViolationsManager
        );
        if (flowAnalyzer != null) {
            flowAnalyzer.initialize(controllerServiceProvider);
        }

        final CronSchedulingAgent cronSchedulingAgent = new CronSchedulingAgent(this, timerDrivenEngineRef.get(), repositoryContextFactory);
        final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), repositoryContextFactory, this.nifiProperties);
        processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, timerDrivenAgent);
        processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, cronSchedulingAgent);

        startConnectablesAfterInitialization = new HashSet<>();
        startRemoteGroupPortsAfterInitialization = new HashSet<>();
        startGroupsAfterInitialization = new HashSet<>();

        final String gracefulShutdownSecondsVal = nifiProperties.getProperty(GRACEFUL_SHUTDOWN_PERIOD);
        long shutdownSecs;
        try {
            shutdownSecs = Long.parseLong(gracefulShutdownSecondsVal);
            if (shutdownSecs < 1) {
                shutdownSecs = DEFAULT_GRACEFUL_SHUTDOWN_SECONDS;
            }
        } catch (final NumberFormatException nfe) {
            shutdownSecs = DEFAULT_GRACEFUL_SHUTDOWN_SECONDS;
        }
        gracefulShutdownSeconds = shutdownSecs;

        remoteInputSocketPort = nifiProperties.getRemoteInputPort();
        remoteInputHttpPort = nifiProperties.getRemoteInputHttpPort();
        isSiteToSiteSecure = nifiProperties.isSiteToSiteSecure();

        this.heartbeatDelaySeconds = (int) FormatUtils.getTimeDuration(nifiProperties.getNodeHeartbeatInterval(), TimeUnit.SECONDS);

        this.snippetManager = new SnippetManager();
        this.reloadComponent = new StandardReloadComponent(this);

        final ProcessGroup rootGroup = flowManager.createProcessGroup(ComponentIdGenerator.generateId().toString());
        rootGroup.setName(FlowManager.DEFAULT_ROOT_GROUP_NAME);
        setRootGroup(rootGroup);
        instanceId = ComponentIdGenerator.generateId().toString();

        this.validationThreadPool = new FlowEngine(5, "Validate Components", true);
        this.validationTrigger = new StandardValidationTrigger(validationThreadPool, this::isInitialized);

        if (remoteInputSocketPort == null) {
            LOG.info("Not enabling RAW Socket Site-to-Site functionality because nifi.remote.input.socket.port is not set");
        } else if (isSiteToSiteSecure && sslContext == null) {
            LOG.error("Unable to create Secure Site-to-Site Listener because not all required Keystore/Truststore "
                    + "Properties are set. Site-to-Site functionality will be disabled until this problem is has been fixed.");
        } else {
            // Register the SocketFlowFileServerProtocol as the appropriate resource for site-to-site Server Protocol
            RemoteResourceManager.setServerProtocolImplementation(SocketFlowFileServerProtocol.RESOURCE_NAME, SocketFlowFileServerProtocol.class);

            final NodeInformant nodeInformant = configuredForClustering ? new ClusterCoordinatorNodeInformant(clusterCoordinator) : null;
            externalSiteListeners.add(new SocketRemoteSiteListener(remoteInputSocketPort, isSiteToSiteSecure ? sslContext : null, nifiProperties, nodeInformant));
        }

        if (remoteInputHttpPort == null) {
            LOG.debug("Not enabling HTTP(S) Site-to-Site functionality because the '{}' property is not true", NiFiProperties.SITE_TO_SITE_HTTP_ENABLED);
        } else {
            externalSiteListeners.add(HttpRemoteSiteListener.getInstance(nifiProperties));
        }

        for (final RemoteSiteListener listener : externalSiteListeners) {
            listener.setRootGroup(rootGroup);
        }

        // Determine frequency for obtaining component status snapshots
        final String snapshotFrequency = nifiProperties.getProperty(NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY, NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY);
        long snapshotMillis;
        try {
            snapshotMillis = FormatUtils.getTimeDuration(snapshotFrequency, TimeUnit.MILLISECONDS);
        } catch (final Exception e) {
            snapshotMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY, TimeUnit.MILLISECONDS);
        }

        // Initialize the Embedded ZooKeeper server, if applicable
        if (nifiProperties.isStartEmbeddedZooKeeper() && configuredForClustering) {
            try {
                zooKeeperStateServer = ZooKeeperStateServer.create(nifiProperties);
                zooKeeperStateServer.start();
            } catch (final IOException | ConfigException e) {
                throw new IllegalStateException("Unable to initialize Flow because NiFi was configured to start an Embedded Zookeeper server but failed to do so", e);
            }
        } else {
            zooKeeperStateServer = null;
        }

        final boolean analyticsEnabled = Boolean.parseBoolean(nifiProperties.getProperty(NiFiProperties.ANALYTICS_PREDICTION_ENABLED, NiFiProperties.DEFAULT_ANALYTICS_PREDICTION_ENABLED));

        if (analyticsEnabled) {

            // Determine interval for predicting future feature values
            final String predictionInterval = nifiProperties.getProperty(NiFiProperties.ANALYTICS_PREDICTION_INTERVAL, NiFiProperties.DEFAULT_ANALYTICS_PREDICTION_INTERVAL);
            long predictionIntervalMillis;
            try {
                predictionIntervalMillis = FormatUtils.getTimeDuration(predictionInterval, TimeUnit.MILLISECONDS);
            } catch (final Exception e) {
                LOG.warn("Analytics is enabled however could not retrieve value for {}. This property has been set to '{}'",
                        NiFiProperties.ANALYTICS_PREDICTION_INTERVAL, NiFiProperties.DEFAULT_ANALYTICS_PREDICTION_INTERVAL);
                predictionIntervalMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_ANALYTICS_PREDICTION_INTERVAL, TimeUnit.MILLISECONDS);
            }

            // Determine interval for querying past observations
            final String queryInterval = nifiProperties.getProperty(NiFiProperties.ANALYTICS_QUERY_INTERVAL, NiFiProperties.DEFAULT_ANALYTICS_QUERY_INTERVAL);
            long queryIntervalMillis;
            try {
                queryIntervalMillis = FormatUtils.getTimeDuration(queryInterval, TimeUnit.MILLISECONDS);
            } catch (final Exception e) {
                LOG.warn("Analytics is enabled however could not retrieve value for {}. This property has been set to '{}'",
                        NiFiProperties.ANALYTICS_QUERY_INTERVAL, NiFiProperties.DEFAULT_ANALYTICS_QUERY_INTERVAL);
                queryIntervalMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_ANALYTICS_QUERY_INTERVAL, TimeUnit.MILLISECONDS);
            }

            // Determine score name to use for evaluating model performance
            String modelScoreName = nifiProperties.getProperty(NiFiProperties.ANALYTICS_CONNECTION_MODEL_SCORE_NAME, NiFiProperties.DEFAULT_ANALYTICS_CONNECTION_SCORE_NAME);

            // Determine score threshold to use when evaluating acceptable model
            Double modelScoreThreshold;
            try {
                modelScoreThreshold = Double.valueOf(nifiProperties.getProperty(NiFiProperties.ANALYTICS_CONNECTION_MODEL_SCORE_THRESHOLD,
                        Double.toString(NiFiProperties.DEFAULT_ANALYTICS_CONNECTION_SCORE_THRESHOLD)));
            } catch (final Exception e) {
                LOG.warn("Analytics is enabled however could not retrieve value for {}. This property has been set to '{}'.",
                        NiFiProperties.ANALYTICS_CONNECTION_MODEL_SCORE_THRESHOLD, NiFiProperties.DEFAULT_ANALYTICS_CONNECTION_SCORE_THRESHOLD);
                modelScoreThreshold = NiFiProperties.DEFAULT_ANALYTICS_CONNECTION_SCORE_THRESHOLD;
            }

            StatusAnalyticsModelMapFactory statusAnalyticsModelMapFactory = new StatusAnalyticsModelMapFactory(extensionManager, nifiProperties);

            analyticsEngine = new CachingConnectionStatusAnalyticsEngine(flowManager, statusHistoryRepository, statusAnalyticsModelMapFactory,
                    predictionIntervalMillis, queryIntervalMillis, modelScoreName, modelScoreThreshold);

            timerDrivenEngineRef.get().scheduleWithFixedDelay(() -> {
                try {
                    Long startTs = System.currentTimeMillis();
                    RepositoryStatusReport statusReport = flowFileEventRepository.reportTransferEvents(startTs);
                    flowManager.findAllConnections().forEach(connection -> {
                        ConnectionStatusAnalytics connectionStatusAnalytics = ((ConnectionStatusAnalytics) analyticsEngine.getStatusAnalytics(connection.getIdentifier()));
                        connectionStatusAnalytics.refresh();
                        connectionStatusAnalytics.loadPredictions(statusReport);
                    });
                    Long endTs = System.currentTimeMillis();
                    LOG.debug("Time Elapsed for Prediction for loading all predictions: {}", endTs - startTs);
                } catch (final Exception e) {
                    LOG.error("Failed to generate predictions", e);
                }
            }, 0L, 15, TimeUnit.SECONDS);

        }

        eventAccess = new StandardEventAccess(flowManager, flowFileEventRepository, processScheduler, authorizer, provenanceRepository,
                auditService, analyticsEngine, flowFileRepository, contentRepository);

        timerDrivenEngineRef.get().scheduleWithFixedDelay(() -> {
            try {
                statusHistoryRepository.capture(getNodeStatusSnapshot(), eventAccess.getControllerStatus(), getGarbageCollectionStatus(), new Date());
            } catch (final Exception e) {
                LOG.error("Failed to capture component stats for Stats History", e);
            }
        }, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS);

        this.connectionStatus = new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED);
        heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false));

        if (configuredForClustering) {
            heartbeater = new ClusterProtocolHeartbeater(protocolSender, clusterCoordinator, leaderElectionManager);

            // Check if there is already a cluster coordinator elected. If not, go ahead
            // and register for coordinator role. If there is already one elected, do not register until
            // we have connected to the cluster. This allows us to avoid becoming the coordinator with a
            // flow that is different from the rest of the cluster (especially an empty flow) and then
            // kicking everyone out. This way, we instead inherit the cluster flow before we attempt to be
            // the coordinator.
            LOG.info("Checking for elected Cluster Coordinator...");
            final Optional<String> clusterCoordinatorLeader = leaderElectionManager.getLeader(ClusterRoles.CLUSTER_COORDINATOR);
            if (!clusterCoordinatorLeader.isPresent()) {
                LOG.info("No Cluster Coordinator elected: Registering for Cluster Coordinator election");
                registerForClusterCoordinator(true);
            } else {
                // At this point, we have determined that there is a Cluster Coordinator elected. It is important to note, though,
                // that if we are running an embedded ZooKeeper, and we have just restarted the cluster (at least the nodes that run the
                // embedded ZooKeeper), that we could possibly determine that the Cluster Coordinator is at an address that is not really
                // valid. This is because the latest stable ZooKeeper does not support "Container ZNodes" and as a result the ZNodes that
                // are created are persistent, not ephemeral. Upon restart, we can get this persisted value, even though the node that belongs
                // to that address has not started. ZooKeeper/Curator will recognize this after a while and delete the ZNode. As a result,
                // we may later determine that there is in fact no Cluster Coordinator. If this happens, we will automatically register for
                // Cluster Coordinator through the StandardFlowService.
                LOG.info("Cluster Coordinator [{}] elected: Not registering for election until after connecting "
                        + "to the cluster and inheriting the flow", clusterCoordinatorLeader.get());
                registerForClusterCoordinator(false);
            }

            leaderElectionManager.start();
            heartbeatMonitor.start();

            final InetSocketAddress loadBalanceAddress = nifiProperties.getClusterLoadBalanceAddress();
            // Setup Load Balancing Server
            final EventReporter eventReporter = createEventReporter();

            final LoadBalanceAuthorizer authorizeConnection = new ClusterLoadBalanceAuthorizer(clusterCoordinator, eventReporter);
            final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(flowFileRepo, contentRepository, provenanceRepository, this, authorizeConnection);

            final int numThreads = nifiProperties.getIntegerProperty(NiFiProperties.LOAD_BALANCE_MAX_THREAD_COUNT, NiFiProperties.DEFAULT_LOAD_BALANCE_MAX_THREAD_COUNT);
            final String timeoutPeriod = nifiProperties.getProperty(NiFiProperties.LOAD_BALANCE_COMMS_TIMEOUT, NiFiProperties.DEFAULT_LOAD_BALANCE_COMMS_TIMEOUT);
            final int timeoutMillis = (int) FormatUtils.getTimeDuration(timeoutPeriod, TimeUnit.MILLISECONDS);

            loadBalanceServer = new ConnectionLoadBalanceServer(loadBalanceAddress.getHostName(), loadBalanceAddress.getPort(), sslContext,
                    numThreads, loadBalanceProtocol, eventReporter, timeoutMillis);


            final int connectionsPerNode = nifiProperties.getIntegerProperty(NiFiProperties.LOAD_BALANCE_CONNECTIONS_PER_NODE, NiFiProperties.DEFAULT_LOAD_BALANCE_CONNECTIONS_PER_NODE);
            final NioAsyncLoadBalanceClientFactory asyncClientFactory = new NioAsyncLoadBalanceClientFactory(sslContext, timeoutMillis, new ContentRepositoryFlowFileAccess(contentRepository),
                    eventReporter, new StandardLoadBalanceFlowFileCodec(), clusterCoordinator);
            loadBalanceClientRegistry = new NioAsyncLoadBalanceClientRegistry(asyncClientFactory, connectionsPerNode);

            final int loadBalanceClientThreadCount = nifiProperties.getIntegerProperty(NiFiProperties.LOAD_BALANCE_MAX_THREAD_COUNT, NiFiProperties.DEFAULT_LOAD_BALANCE_MAX_THREAD_COUNT);
            loadBalanceClientThreadPool = new FlowEngine(loadBalanceClientThreadCount, "Load-Balanced Client", true);

            for (int i = 0; i < loadBalanceClientThreadCount; i++) {
                final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(loadBalanceClientRegistry, clusterCoordinator, eventReporter);
                loadBalanceClientTasks.add(clientTask);
                loadBalanceClientThreadPool.submit(clientTask);
            }
        } else {
            loadBalanceClientRegistry = null;
            heartbeater = null;
            loadBalanceServer = null;
            loadBalanceClientThreadPool = null;
        }

        longRunningTaskMonitorThreadPool = isLongRunningTaskMonitorEnabled()
                ? Optional.of(new FlowEngine(1, "Long Running Task Monitor", true))
                : Optional.empty();
    }