public void start()

in pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java [595:815]


  public void start()
      throws Exception {
    LOGGER.info("Starting Pinot server (Version: {})", PinotVersion.VERSION);
    LOGGER.info("Server configs: {}", new PinotAppConfigs(getConfig()).toJSONString());
    long startTimeMs = System.currentTimeMillis();

    // install default SSL context if necessary (even if not force-enabled everywhere)
    TlsConfig tlsDefaults = TlsUtils.extractTlsConfig(_serverConf, Server.SERVER_TLS_PREFIX);
    if (StringUtils.isNotBlank(tlsDefaults.getKeyStorePath()) || StringUtils.isNotBlank(
        tlsDefaults.getTrustStorePath())) {
      LOGGER.info("Installing default SSL context for any client requests");
      TlsUtils.installDefaultSSLSocketFactory(tlsDefaults);
    }

    LOGGER.info("Initializing accessControlFactory");
    String accessControlFactoryClass =
        _serverConf.getProperty(Server.ACCESS_CONTROL_FACTORY_CLASS, Server.DEFAULT_ACCESS_CONTROL_FACTORY_CLASS);
    LOGGER.info("Using class: {} as the AccessControlFactory", accessControlFactoryClass);
    try {
      _accessControlFactory = PluginManager.get().createInstance(accessControlFactoryClass);
    } catch (Exception e) {
      throw new RuntimeException(
          "Caught exception while creating new AccessControlFactory instance using class '" + accessControlFactoryClass
              + "'", e);
    }

    // Create a thread pool used for mutable lucene index searches, with size based on query_worker_threads config
    LOGGER.info("Initializing lucene searcher thread pool");
    int queryWorkerThreads =
        _serverConf.getProperty(ResourceManager.QUERY_WORKER_CONFIG_KEY, ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
    _realtimeLuceneTextIndexSearcherPool = RealtimeLuceneTextIndexSearcherPool.init(queryWorkerThreads);

    // Initialize RealtimeLuceneIndexRefreshManager with max refresh threads and min refresh interval configs
    LOGGER.info("Initializing lucene refresh manager");
    int luceneMaxRefreshThreads =
        _serverConf.getProperty(Server.LUCENE_MAX_REFRESH_THREADS, Server.DEFAULT_LUCENE_MAX_REFRESH_THREADS);
    int luceneMinRefreshIntervalDuration =
        _serverConf.getProperty(Server.LUCENE_MIN_REFRESH_INTERVAL_MS, Server.DEFAULT_LUCENE_MIN_REFRESH_INTERVAL_MS);
    _realtimeLuceneTextIndexRefreshManager =
        RealtimeLuceneIndexRefreshManager.init(luceneMaxRefreshThreads, luceneMinRefreshIntervalDuration);

    LOGGER.info("Initializing server instance and registering state model factory");
    Utils.logVersions();
    ControllerLeaderLocator.create(_helixManager);
    ServerSegmentCompletionProtocolHandler.init(
        _serverConf.subset(SegmentCompletionProtocol.PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER));

    int maxPreprocessConcurrency = Integer.parseInt(
        _serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM,
            Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM));
    int maxPreprocessConcurrencyBeforeServingQueries = Integer.parseInt(
        _serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES,
            Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES));
    // Relax throttling until the server is ready to serve queries
    SegmentAllIndexPreprocessThrottler segmentAllIndexPreprocessThrottler =
        new SegmentAllIndexPreprocessThrottler(maxPreprocessConcurrency, maxPreprocessConcurrencyBeforeServingQueries,
            false);
    int maxStarTreePreprocessConcurrency = Integer.parseInt(
        _serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM,
            Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM));
    int maxStarTreePreprocessConcurrencyBeforeServingQueries = Integer.parseInt(
        _serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES,
            Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES));
    // Relax throttling until the server is ready to serve queries
    SegmentStarTreePreprocessThrottler segmentStarTreePreprocessThrottler =
        new SegmentStarTreePreprocessThrottler(maxStarTreePreprocessConcurrency,
            maxStarTreePreprocessConcurrencyBeforeServingQueries, false);
    int maxDownloadConcurrency = Integer.parseInt(
        _serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM,
            Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM));
    int maxDownloadConcurrencyBeforeServingQueries = Integer.parseInt(
        _serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES,
            Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES));
    // Relax throttling until the server is ready to serve queries
    SegmentDownloadThrottler segmentDownloadThrottler =
        new SegmentDownloadThrottler(maxDownloadConcurrency, maxDownloadConcurrencyBeforeServingQueries, false);
    _segmentOperationsThrottler =
        new SegmentOperationsThrottler(segmentAllIndexPreprocessThrottler, segmentStarTreePreprocessThrottler,
            segmentDownloadThrottler);

    SendStatsPredicate sendStatsPredicate = SendStatsPredicate.create(_serverConf);
    ServerConf serverConf = new ServerConf(_serverConf);
    _serverInstance = new ServerInstance(serverConf, _helixManager, _accessControlFactory, _segmentOperationsThrottler,
        sendStatsPredicate);
    ServerMetrics serverMetrics = _serverInstance.getServerMetrics();

    InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager();
    instanceDataManager.setSupplierOfIsServerReadyToServeQueries(() -> _isServerReadyToServeQueries);
    // initialize the thread accountant for query killing
    Tracing.ThreadAccountantOps.initializeThreadAccountant(
        _serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId,
        org.apache.pinot.spi.config.instance.InstanceType.SERVER);
    initSegmentFetcher(_serverConf);
    StateModelFactory<?> stateModelFactory =
        new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager);
    _helixManager.getStateMachineEngine()
        .registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(), stateModelFactory);
    // Start the data manager as a pre-connect callback so that it starts after connecting to the ZK in order to access
    // the property store, but before receiving state transitions
    _helixManager.addPreConnectCallback(_serverInstance::startDataManager);

    LOGGER.info("Connecting Helix manager");
    _helixManager.connect();
    _helixAdmin = _helixManager.getClusterManagmentTool();
    updateInstanceConfigIfNeeded(serverConf);

    LOGGER.info("Initializing and registering the DefaultClusterConfigChangeHandler");
    try {
      _helixManager.addClusterfigChangeListener(_clusterConfigChangeHandler);
    } catch (Exception e) {
      LOGGER.error("Failed to register DefaultClusterConfigChangeHandler as the Helix ClusterConfigChangeListener", e);
    }
    _clusterConfigChangeHandler.registerClusterConfigChangeListener(_segmentOperationsThrottler);

    LOGGER.info("Initializing and registering the SendStatsPredicate");
    try {
      _helixManager.addInstanceConfigChangeListener(sendStatsPredicate);
    } catch (Exception e) {
      LOGGER.error("Failed to register SendStatsPredicate as the Helix InstanceConfigChangeListener", e);
    }

    // Start restlet server for admin API endpoint
    LOGGER.info("Starting server admin application on: {}", ListenerConfigUtil.toString(_listenerConfigs));
    _adminApiApplication = createServerAdminApp();
    _adminApiApplication.start(_listenerConfigs);

    // Init QueryRewriterFactory
    LOGGER.info("Initializing QueryRewriterFactory");
    QueryRewriterFactory.init(_serverConf.getProperty(Server.CONFIG_OF_SERVER_QUERY_REWRITER_CLASS_NAMES));

    // Register message handler factory
    SegmentMessageHandlerFactory messageHandlerFactory =
        new SegmentMessageHandlerFactory(instanceDataManager, serverMetrics);
    _helixManager.getMessagingService()
        .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), messageHandlerFactory);

    serverMetrics.addCallbackGauge(Helix.INSTANCE_CONNECTED_METRIC_NAME, () -> _helixManager.isConnected() ? 1L : 0L);
    _helixManager.addPreConnectCallback(
        () -> serverMetrics.addMeteredGlobalValue(ServerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L));

    // Register the service status handler
    registerServiceStatusHandler();

    if (_serverConf.getProperty(Server.CONFIG_OF_STARTUP_ENABLE_SERVICE_STATUS_CHECK,
        Server.DEFAULT_STARTUP_ENABLE_SERVICE_STATUS_CHECK)) {
      long endTimeMs =
          startTimeMs + _serverConf.getProperty(Server.CONFIG_OF_STARTUP_TIMEOUT_MS, Server.DEFAULT_STARTUP_TIMEOUT_MS);
      try {
        startupServiceStatusCheck(endTimeMs);
      } catch (Exception e) {
        LOGGER.error("Caught exception while checking service status. Stopping server.", e);
        // If we exit here, only the _adminApiApplication and _helixManager are initialized, so we only stop them
        _adminApiApplication.stop();
        _helixManager.disconnect();
        throw e;
      }
    }

    // Initialize regex pattern factory
    PatternFactory.init(
        _serverConf.getProperty(Server.CONFIG_OF_SERVER_QUERY_REGEX_CLASS, Server.DEFAULT_SERVER_QUERY_REGEX_CLASS));

    preServeQueries();

    // Enable Server level realtime ingestion rate limier
    RealtimeConsumptionRateManager.getInstance().createServerRateLimiter(_serverConf, serverMetrics);

    // Start the query server after finishing the service status check. If the query server is started before all the
    // segments are loaded, broker might not have finished processing the callback of routing table update, and start
    // querying the server pre-maturely.
    _serverInstance.startQueryServer();
    _helixAdmin.setConfig(_instanceConfigScope,
        Collections.singletonMap(Helix.IS_SHUTDOWN_IN_PROGRESS, Boolean.toString(false)));
    _isServerReadyToServeQueries = true;
    // Throttling for realtime consumption is disabled up to this point to allow maximum consumption during startup time
    RealtimeConsumptionRateManager.getInstance().enableThrottling();

    LOGGER.info("Pinot server ready");

    // Create metrics for mmap stuff
    serverMetrics.addCallbackGauge("memory.directBufferCount", PinotDataBuffer::getDirectBufferCount);
    serverMetrics.addCallbackGauge("memory.directBufferUsage", PinotDataBuffer::getDirectBufferUsage);
    serverMetrics.addCallbackGauge("memory.mmapBufferCount", PinotDataBuffer::getMmapBufferCount);
    serverMetrics.addCallbackGauge("memory.mmapBufferUsage", PinotDataBuffer::getMmapBufferUsage);
    serverMetrics.addCallbackGauge("memory.allocationFailureCount", PinotDataBuffer::getAllocationFailureCount);

    // Add ZK buffer size metric
    serverMetrics.setOrUpdateGlobalGauge(ServerGauge.ZK_JUTE_MAX_BUFFER,
        () -> Integer.getInteger(ZkSystemPropertyKeys.JUTE_MAXBUFFER, 0xfffff));

    // Track metric for queries disabled
    _serverQueriesDisabledTracker =
        new ServerQueriesDisabledTracker(_helixClusterName, _instanceId, _helixManager, serverMetrics);
    _serverQueriesDisabledTracker.start();

    // Add metrics for consumer directory usage
    serverMetrics.setOrUpdateGlobalGauge(ServerGauge.REALTIME_CONSUMER_DIR_USAGE, () -> {
      List<File> instanceConsumerDirs = instanceDataManager.getConsumerDirPaths();
      long totalSize = 0;
      try {
        for (File consumerDir : instanceConsumerDirs) {
          if (consumerDir.exists()) {
            totalSize += FileUtils.sizeOfDirectory(consumerDir);
          }
        }
        return totalSize;
      } catch (Exception e) {
        LOGGER.warn("Failed to gather size info for consumer directories", e);
        return CONSUMER_DIRECTORY_EXCEPTION_VALUE;
      }
    });

    long startupDurationMs = System.currentTimeMillis() - startTimeMs;
    if (ServiceStatus.getServiceStatus(_instanceId).equals(Status.GOOD)) {
      serverMetrics.addTimedValue(
          ServerTimer.STARTUP_SUCCESS_DURATION_MS, startupDurationMs, TimeUnit.MILLISECONDS);
    } else {
      serverMetrics.addTimedValue(
          ServerTimer.STARTUP_FAILURE_DURATION_MS, startupDurationMs, TimeUnit.MILLISECONDS);
    }
  }