in pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java [595:815]
public void start()
throws Exception {
LOGGER.info("Starting Pinot server (Version: {})", PinotVersion.VERSION);
LOGGER.info("Server configs: {}", new PinotAppConfigs(getConfig()).toJSONString());
long startTimeMs = System.currentTimeMillis();
// install default SSL context if necessary (even if not force-enabled everywhere)
TlsConfig tlsDefaults = TlsUtils.extractTlsConfig(_serverConf, Server.SERVER_TLS_PREFIX);
if (StringUtils.isNotBlank(tlsDefaults.getKeyStorePath()) || StringUtils.isNotBlank(
tlsDefaults.getTrustStorePath())) {
LOGGER.info("Installing default SSL context for any client requests");
TlsUtils.installDefaultSSLSocketFactory(tlsDefaults);
}
LOGGER.info("Initializing accessControlFactory");
String accessControlFactoryClass =
_serverConf.getProperty(Server.ACCESS_CONTROL_FACTORY_CLASS, Server.DEFAULT_ACCESS_CONTROL_FACTORY_CLASS);
LOGGER.info("Using class: {} as the AccessControlFactory", accessControlFactoryClass);
try {
_accessControlFactory = PluginManager.get().createInstance(accessControlFactoryClass);
} catch (Exception e) {
throw new RuntimeException(
"Caught exception while creating new AccessControlFactory instance using class '" + accessControlFactoryClass
+ "'", e);
}
// Create a thread pool used for mutable lucene index searches, with size based on query_worker_threads config
LOGGER.info("Initializing lucene searcher thread pool");
int queryWorkerThreads =
_serverConf.getProperty(ResourceManager.QUERY_WORKER_CONFIG_KEY, ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
_realtimeLuceneTextIndexSearcherPool = RealtimeLuceneTextIndexSearcherPool.init(queryWorkerThreads);
// Initialize RealtimeLuceneIndexRefreshManager with max refresh threads and min refresh interval configs
LOGGER.info("Initializing lucene refresh manager");
int luceneMaxRefreshThreads =
_serverConf.getProperty(Server.LUCENE_MAX_REFRESH_THREADS, Server.DEFAULT_LUCENE_MAX_REFRESH_THREADS);
int luceneMinRefreshIntervalDuration =
_serverConf.getProperty(Server.LUCENE_MIN_REFRESH_INTERVAL_MS, Server.DEFAULT_LUCENE_MIN_REFRESH_INTERVAL_MS);
_realtimeLuceneTextIndexRefreshManager =
RealtimeLuceneIndexRefreshManager.init(luceneMaxRefreshThreads, luceneMinRefreshIntervalDuration);
LOGGER.info("Initializing server instance and registering state model factory");
Utils.logVersions();
ControllerLeaderLocator.create(_helixManager);
ServerSegmentCompletionProtocolHandler.init(
_serverConf.subset(SegmentCompletionProtocol.PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER));
int maxPreprocessConcurrency = Integer.parseInt(
_serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM,
Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM));
int maxPreprocessConcurrencyBeforeServingQueries = Integer.parseInt(
_serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES,
Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES));
// Relax throttling until the server is ready to serve queries
SegmentAllIndexPreprocessThrottler segmentAllIndexPreprocessThrottler =
new SegmentAllIndexPreprocessThrottler(maxPreprocessConcurrency, maxPreprocessConcurrencyBeforeServingQueries,
false);
int maxStarTreePreprocessConcurrency = Integer.parseInt(
_serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM,
Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM));
int maxStarTreePreprocessConcurrencyBeforeServingQueries = Integer.parseInt(
_serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES,
Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES));
// Relax throttling until the server is ready to serve queries
SegmentStarTreePreprocessThrottler segmentStarTreePreprocessThrottler =
new SegmentStarTreePreprocessThrottler(maxStarTreePreprocessConcurrency,
maxStarTreePreprocessConcurrencyBeforeServingQueries, false);
int maxDownloadConcurrency = Integer.parseInt(
_serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM,
Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM));
int maxDownloadConcurrencyBeforeServingQueries = Integer.parseInt(
_serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES,
Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES));
// Relax throttling until the server is ready to serve queries
SegmentDownloadThrottler segmentDownloadThrottler =
new SegmentDownloadThrottler(maxDownloadConcurrency, maxDownloadConcurrencyBeforeServingQueries, false);
_segmentOperationsThrottler =
new SegmentOperationsThrottler(segmentAllIndexPreprocessThrottler, segmentStarTreePreprocessThrottler,
segmentDownloadThrottler);
SendStatsPredicate sendStatsPredicate = SendStatsPredicate.create(_serverConf);
ServerConf serverConf = new ServerConf(_serverConf);
_serverInstance = new ServerInstance(serverConf, _helixManager, _accessControlFactory, _segmentOperationsThrottler,
sendStatsPredicate);
ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager();
instanceDataManager.setSupplierOfIsServerReadyToServeQueries(() -> _isServerReadyToServeQueries);
// initialize the thread accountant for query killing
Tracing.ThreadAccountantOps.initializeThreadAccountant(
_serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId,
org.apache.pinot.spi.config.instance.InstanceType.SERVER);
initSegmentFetcher(_serverConf);
StateModelFactory<?> stateModelFactory =
new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager);
_helixManager.getStateMachineEngine()
.registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(), stateModelFactory);
// Start the data manager as a pre-connect callback so that it starts after connecting to the ZK in order to access
// the property store, but before receiving state transitions
_helixManager.addPreConnectCallback(_serverInstance::startDataManager);
LOGGER.info("Connecting Helix manager");
_helixManager.connect();
_helixAdmin = _helixManager.getClusterManagmentTool();
updateInstanceConfigIfNeeded(serverConf);
LOGGER.info("Initializing and registering the DefaultClusterConfigChangeHandler");
try {
_helixManager.addClusterfigChangeListener(_clusterConfigChangeHandler);
} catch (Exception e) {
LOGGER.error("Failed to register DefaultClusterConfigChangeHandler as the Helix ClusterConfigChangeListener", e);
}
_clusterConfigChangeHandler.registerClusterConfigChangeListener(_segmentOperationsThrottler);
LOGGER.info("Initializing and registering the SendStatsPredicate");
try {
_helixManager.addInstanceConfigChangeListener(sendStatsPredicate);
} catch (Exception e) {
LOGGER.error("Failed to register SendStatsPredicate as the Helix InstanceConfigChangeListener", e);
}
// Start restlet server for admin API endpoint
LOGGER.info("Starting server admin application on: {}", ListenerConfigUtil.toString(_listenerConfigs));
_adminApiApplication = createServerAdminApp();
_adminApiApplication.start(_listenerConfigs);
// Init QueryRewriterFactory
LOGGER.info("Initializing QueryRewriterFactory");
QueryRewriterFactory.init(_serverConf.getProperty(Server.CONFIG_OF_SERVER_QUERY_REWRITER_CLASS_NAMES));
// Register message handler factory
SegmentMessageHandlerFactory messageHandlerFactory =
new SegmentMessageHandlerFactory(instanceDataManager, serverMetrics);
_helixManager.getMessagingService()
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), messageHandlerFactory);
serverMetrics.addCallbackGauge(Helix.INSTANCE_CONNECTED_METRIC_NAME, () -> _helixManager.isConnected() ? 1L : 0L);
_helixManager.addPreConnectCallback(
() -> serverMetrics.addMeteredGlobalValue(ServerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L));
// Register the service status handler
registerServiceStatusHandler();
if (_serverConf.getProperty(Server.CONFIG_OF_STARTUP_ENABLE_SERVICE_STATUS_CHECK,
Server.DEFAULT_STARTUP_ENABLE_SERVICE_STATUS_CHECK)) {
long endTimeMs =
startTimeMs + _serverConf.getProperty(Server.CONFIG_OF_STARTUP_TIMEOUT_MS, Server.DEFAULT_STARTUP_TIMEOUT_MS);
try {
startupServiceStatusCheck(endTimeMs);
} catch (Exception e) {
LOGGER.error("Caught exception while checking service status. Stopping server.", e);
// If we exit here, only the _adminApiApplication and _helixManager are initialized, so we only stop them
_adminApiApplication.stop();
_helixManager.disconnect();
throw e;
}
}
// Initialize regex pattern factory
PatternFactory.init(
_serverConf.getProperty(Server.CONFIG_OF_SERVER_QUERY_REGEX_CLASS, Server.DEFAULT_SERVER_QUERY_REGEX_CLASS));
preServeQueries();
// Enable Server level realtime ingestion rate limier
RealtimeConsumptionRateManager.getInstance().createServerRateLimiter(_serverConf, serverMetrics);
// Start the query server after finishing the service status check. If the query server is started before all the
// segments are loaded, broker might not have finished processing the callback of routing table update, and start
// querying the server pre-maturely.
_serverInstance.startQueryServer();
_helixAdmin.setConfig(_instanceConfigScope,
Collections.singletonMap(Helix.IS_SHUTDOWN_IN_PROGRESS, Boolean.toString(false)));
_isServerReadyToServeQueries = true;
// Throttling for realtime consumption is disabled up to this point to allow maximum consumption during startup time
RealtimeConsumptionRateManager.getInstance().enableThrottling();
LOGGER.info("Pinot server ready");
// Create metrics for mmap stuff
serverMetrics.addCallbackGauge("memory.directBufferCount", PinotDataBuffer::getDirectBufferCount);
serverMetrics.addCallbackGauge("memory.directBufferUsage", PinotDataBuffer::getDirectBufferUsage);
serverMetrics.addCallbackGauge("memory.mmapBufferCount", PinotDataBuffer::getMmapBufferCount);
serverMetrics.addCallbackGauge("memory.mmapBufferUsage", PinotDataBuffer::getMmapBufferUsage);
serverMetrics.addCallbackGauge("memory.allocationFailureCount", PinotDataBuffer::getAllocationFailureCount);
// Add ZK buffer size metric
serverMetrics.setOrUpdateGlobalGauge(ServerGauge.ZK_JUTE_MAX_BUFFER,
() -> Integer.getInteger(ZkSystemPropertyKeys.JUTE_MAXBUFFER, 0xfffff));
// Track metric for queries disabled
_serverQueriesDisabledTracker =
new ServerQueriesDisabledTracker(_helixClusterName, _instanceId, _helixManager, serverMetrics);
_serverQueriesDisabledTracker.start();
// Add metrics for consumer directory usage
serverMetrics.setOrUpdateGlobalGauge(ServerGauge.REALTIME_CONSUMER_DIR_USAGE, () -> {
List<File> instanceConsumerDirs = instanceDataManager.getConsumerDirPaths();
long totalSize = 0;
try {
for (File consumerDir : instanceConsumerDirs) {
if (consumerDir.exists()) {
totalSize += FileUtils.sizeOfDirectory(consumerDir);
}
}
return totalSize;
} catch (Exception e) {
LOGGER.warn("Failed to gather size info for consumer directories", e);
return CONSUMER_DIRECTORY_EXCEPTION_VALUE;
}
});
long startupDurationMs = System.currentTimeMillis() - startTimeMs;
if (ServiceStatus.getServiceStatus(_instanceId).equals(Status.GOOD)) {
serverMetrics.addTimedValue(
ServerTimer.STARTUP_SUCCESS_DURATION_MS, startupDurationMs, TimeUnit.MILLISECONDS);
} else {
serverMetrics.addTimedValue(
ServerTimer.STARTUP_FAILURE_DURATION_MS, startupDurationMs, TimeUnit.MILLISECONDS);
}
}