private void setUpPinotController()

in pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java [465:636]


  private void setUpPinotController() {
    // install default SSL context if necessary (even if not force-enabled everywhere)
    TlsConfig tlsDefaults = TlsUtils.extractTlsConfig(_config, ControllerConf.CONTROLLER_TLS_PREFIX);
    if (StringUtils.isNotBlank(tlsDefaults.getKeyStorePath()) || StringUtils.isNotBlank(
        tlsDefaults.getTrustStorePath())) {
      LOGGER.info("Installing default SSL context for any client requests");
      TlsUtils.installDefaultSSLSocketFactory(tlsDefaults);
    }

    // Set up Pinot cluster in Helix if needed
    HelixSetupUtils.setupPinotCluster(_helixClusterName, _helixZkURL, _isUpdateStateModel, _enableBatchMessageMode,
        _config);

    // Start all components
    initPinotFSFactory();
    initControllerFilePathProvider();
    initSegmentFetcherFactory();
    initPinotCrypterFactory();

    LOGGER.info("Initializing QueryRewriterFactory");
    QueryRewriterFactory.init(
        _config.getProperty(CommonConstants.Controller.CONFIG_OF_CONTROLLER_QUERY_REWRITER_CLASS_NAMES));

    LOGGER.info("Initializing Helix participant manager");
    _helixParticipantManager =
        HelixManagerFactory.getZKHelixManager(_helixClusterName, _helixParticipantInstanceId, InstanceType.PARTICIPANT,
            _helixZkURL);

    // LeadControllerManager needs to be initialized before registering as Helix participant.
    LOGGER.info("Initializing lead controller manager");
    _leadControllerManager =
        new LeadControllerManager(_helixControllerInstanceId, _helixParticipantManager, _controllerMetrics);

    LOGGER.info("Registering and connecting Helix participant manager as Helix Participant role");
    registerAndConnectAsHelixParticipant();

    // LeadControllerManager needs to be started after the connection
    // as it can check Helix leadership and resource config only after connecting to Helix cluster.
    LOGGER.info("Starting lead controller manager");
    _leadControllerManager.start();

    LOGGER.info("Starting Pinot Helix resource manager and connecting to Zookeeper");
    _helixResourceManager.start(_helixParticipantManager, _controllerMetrics);

    // Initialize segment lifecycle event listeners
    PinotSegmentLifecycleEventListenerManager.getInstance().init(_helixParticipantManager);

    LOGGER.info("Starting task resource manager");
    _helixTaskResourceManager =
        new PinotHelixTaskResourceManager(_helixResourceManager, new TaskDriver(_helixParticipantManager),
            _config.getPinotTaskExpireTimeInMs());

    // Helix resource manager must be started in order to create PinotLLCRealtimeSegmentManager
    LOGGER.info("Starting realtime segment manager");
    _pinotLLCRealtimeSegmentManager = createPinotLLCRealtimeSegmentManager();
    // TODO: Need to put this inside HelixResourceManager when HelixControllerLeadershipManager is removed.
    _helixResourceManager.registerPinotLLCRealtimeSegmentManager(_pinotLLCRealtimeSegmentManager);

    SegmentCompletionConfig segmentCompletionConfig = new SegmentCompletionConfig(_config);

    _segmentCompletionManager =
        new SegmentCompletionManager(_helixParticipantManager, _pinotLLCRealtimeSegmentManager, _controllerMetrics,
            _leadControllerManager, _config.getSegmentCommitTimeoutSeconds(), segmentCompletionConfig);

    _sqlQueryExecutor = new SqlQueryExecutor(_config.generateVipUrl());

    _connectionManager = PoolingHttpClientConnectionManagerHelper.createWithSocketFactory();
    _connectionManager.setDefaultSocketConfig(
        SocketConfig.custom()
            .setSoTimeout(Timeout.of(_config.getServerAdminRequestTimeoutSeconds() * 1000, TimeUnit.MILLISECONDS))
            .build());
    _tableSizeReader =
        new TableSizeReader(_executorService, _connectionManager, _controllerMetrics, _helixResourceManager,
            _leadControllerManager);
    _helixResourceManager.registerTableSizeReader(_tableSizeReader);
    _storageQuotaChecker = new StorageQuotaChecker(_tableSizeReader, _controllerMetrics, _leadControllerManager,
        _helixResourceManager, _config);

    _diskUtilizationChecker = new DiskUtilizationChecker(_helixResourceManager, _config);
    _resourceUtilizationManager = new ResourceUtilizationManager(_config, _diskUtilizationChecker);

    // Setting up periodic tasks
    List<PeriodicTask> controllerPeriodicTasks = setupControllerPeriodicTasks();
    LOGGER.info("Init controller periodic tasks scheduler");
    _periodicTaskScheduler = new PeriodicTaskScheduler();
    _periodicTaskScheduler.init(controllerPeriodicTasks);
    _periodicTaskScheduler.start();

    // Register message handler for incoming user-defined helix messages.
    _helixParticipantManager.getMessagingService()
        .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
            new ControllerUserDefinedMessageHandlerFactory(_periodicTaskScheduler));

    String accessControlFactoryClass = _config.getAccessControlFactoryClass();
    LOGGER.info("Use class: {} as the AccessControlFactory", accessControlFactoryClass);
    final AccessControlFactory accessControlFactory;
    try {
      accessControlFactory = (AccessControlFactory) Class.forName(accessControlFactoryClass).newInstance();
      accessControlFactory.init(_config, _helixResourceManager);
    } catch (Exception e) {
      throw new RuntimeException("Caught exception while creating new AccessControlFactory instance", e);
    }

    final MetadataEventNotifierFactory metadataEventNotifierFactory =
        MetadataEventNotifierFactory.loadFactory(_config.subset(METADATA_EVENT_NOTIFIER_PREFIX), _helixResourceManager);

    LOGGER.info("Controller download url base: {}", _config.generateVipUrl());
    LOGGER.info("Injecting configuration and resource managers to the API context");
    // register all the controller objects for injection to jersey resources
    Instant controllerStartTime = Instant.now();
    _adminApp.registerBinder(new AbstractBinder() {
      @Override
      protected void configure() {
        bind(_config).to(ControllerConf.class);
        bind(_helixParticipantInstanceId).named(CONTROLLER_INSTANCE_ID);
        bind(_helixResourceManager).to(PinotHelixResourceManager.class);
        bind(_helixTaskResourceManager).to(PinotHelixTaskResourceManager.class);
        bind(_segmentCompletionManager).to(SegmentCompletionManager.class);
        bind(_taskManager).to(PinotTaskManager.class);
        bind(_taskManagerStatusCache).to(TaskManagerStatusCache.class);
        bind(_connectionManager).to(HttpClientConnectionManager.class);
        bind(_executorService).to(Executor.class);
        bind(_controllerMetrics).to(ControllerMetrics.class);
        bind(accessControlFactory).to(AccessControlFactory.class);
        bind(metadataEventNotifierFactory).to(MetadataEventNotifierFactory.class);
        bind(_leadControllerManager).to(LeadControllerManager.class);
        bind(_periodicTaskScheduler).to(PeriodicTaskScheduler.class);
        bind(_sqlQueryExecutor).to(SqlQueryExecutor.class);
        bind(_pinotLLCRealtimeSegmentManager).to(PinotLLCRealtimeSegmentManager.class);
        bind(_tenantRebalancer).to(TenantRebalancer.class);
        bind(_tableSizeReader).to(TableSizeReader.class);
        bind(_storageQuotaChecker).to(StorageQuotaChecker.class);
        bind(_diskUtilizationChecker).to(DiskUtilizationChecker.class);
        bind(_resourceUtilizationManager).to(ResourceUtilizationManager.class);
        bind(controllerStartTime).named(ControllerAdminApiApplication.START_TIME);
        String loggerRootDir = _config.getProperty(CommonConstants.Controller.CONFIG_OF_LOGGER_ROOT_DIR);
        if (loggerRootDir != null) {
          bind(new LocalLogFileServer(loggerRootDir)).to(LogFileServer.class);
        } else {
          bind(new DummyLogFileServer()).to(LogFileServer.class);
        }
      }
    });

    LOGGER.info("Starting controller admin application on: {}", ListenerConfigUtil.toString(_listenerConfigs));
    _adminApp.start(_listenerConfigs);

    enforceTableConfigAndSchema();

    _controllerMetrics.addCallbackGauge("dataDir.exists", () -> new File(_config.getDataDir()).exists() ? 1L : 0L);
    _controllerMetrics.addCallbackGauge("dataDir.fileOpLatencyMs", () -> {
      File dataDir = new File(_config.getDataDir());
      if (dataDir.exists()) {
        try {
          long startTime = System.currentTimeMillis();
          File testFile = new File(dataDir, _config.getControllerHost());
          try (OutputStream outputStream = new FileOutputStream(testFile, false)) {
            outputStream.write(Longs.toByteArray(System.currentTimeMillis()));
          }
          FileUtils.deleteQuietly(testFile);
          return System.currentTimeMillis() - startTime;
        } catch (IOException e) {
          LOGGER.warn("Caught exception while checking the data directory operation latency", e);
          return DATA_DIRECTORY_EXCEPTION_VALUE;
        }
      } else {
        return DATA_DIRECTORY_MISSING_VALUE;
      }
    });

    _serviceStatusCallbackList.add(generateServiceStatusCallback(_helixParticipantManager));
  }