in pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java [283:517]
public void start()
throws Exception {
LOGGER.info("Starting Pinot broker (Version: {})", PinotVersion.VERSION);
LOGGER.info("Broker configs: {}", new PinotAppConfigs(getConfig()).toJSONString());
long startTimeMs = System.currentTimeMillis();
_isStarting = true;
Utils.logVersions();
LOGGER.info("Connecting spectator Helix manager");
_spectatorHelixManager =
HelixManagerFactory.getZKHelixManager(_clusterName, _instanceId, InstanceType.SPECTATOR, _zkServers);
_spectatorHelixManager.connect();
_helixAdmin = _spectatorHelixManager.getClusterManagmentTool();
_propertyStore = _spectatorHelixManager.getHelixPropertyStore();
_helixDataAccessor = _spectatorHelixManager.getHelixDataAccessor();
LOGGER.info("Setting up broker request handler");
// Set up metric registry and broker metrics
_metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(_brokerConf.subset(Broker.METRICS_CONFIG_PREFIX));
_brokerMetrics = new BrokerMetrics(
_brokerConf.getProperty(Broker.CONFIG_OF_METRICS_NAME_PREFIX, Broker.DEFAULT_METRICS_NAME_PREFIX),
_metricsRegistry,
_brokerConf.getProperty(Broker.CONFIG_OF_ENABLE_TABLE_LEVEL_METRICS, Broker.DEFAULT_ENABLE_TABLE_LEVEL_METRICS),
_brokerConf.getProperty(Broker.CONFIG_OF_ALLOWED_TABLES_FOR_EMITTING_METRICS, Collections.emptyList()));
_brokerMetrics.initializeGlobalMeters();
_brokerMetrics.setValueOfGlobalGauge(BrokerGauge.VERSION, PinotVersion.VERSION_METRIC_NAME, 1);
_brokerMetrics.setValueOfGlobalGauge(BrokerGauge.ZK_JUTE_MAX_BUFFER,
Integer.getInteger(ZkSystemPropertyKeys.JUTE_MAXBUFFER, 0xfffff));
_brokerMetrics.setValueOfGlobalGauge(BrokerGauge.ADAPTIVE_SERVER_SELECTOR_TYPE,
_brokerConf.getProperty(Broker.AdaptiveServerSelector.CONFIG_OF_TYPE,
Broker.AdaptiveServerSelector.DEFAULT_TYPE), 1);
BrokerMetrics.register(_brokerMetrics);
// Set up request handling classes
_serverRoutingStatsManager = new ServerRoutingStatsManager(_brokerConf, _brokerMetrics);
_serverRoutingStatsManager.init();
_routingManager = new BrokerRoutingManager(_brokerMetrics, _serverRoutingStatsManager, _brokerConf);
_routingManager.init(_spectatorHelixManager);
final PinotConfiguration factoryConf = _brokerConf.subset(Broker.ACCESS_CONTROL_CONFIG_PREFIX);
// Adding cluster name to the config so that it can be used by the AccessControlFactory
factoryConf.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, _brokerConf.getProperty(Helix.CONFIG_OF_CLUSTER_NAME));
_accessControlFactory = AccessControlFactory.loadFactory(factoryConf, _propertyStore);
_queryQuotaManager = new HelixExternalViewBasedQueryQuotaManager(_brokerMetrics, _instanceId);
_queryQuotaManager.init(_spectatorHelixManager);
// Initialize QueryRewriterFactory
LOGGER.info("Initializing QueryRewriterFactory");
QueryRewriterFactory.init(_brokerConf.getProperty(Broker.CONFIG_OF_BROKER_QUERY_REWRITER_CLASS_NAMES));
LOGGER.info("Initializing ResultRewriterFactory");
ResultRewriterFactory.init(_brokerConf.getProperty(Broker.CONFIG_OF_BROKER_RESULT_REWRITER_CLASS_NAMES));
// Initialize FunctionRegistry before starting the broker request handler
FunctionRegistry.init();
boolean caseInsensitive =
_brokerConf.getProperty(Helix.ENABLE_CASE_INSENSITIVE_KEY, Helix.DEFAULT_ENABLE_CASE_INSENSITIVE);
TableCache tableCache = new TableCache(_propertyStore, caseInsensitive);
LOGGER.info("Initializing Broker Event Listener Factory");
BrokerQueryEventListenerFactory.init(_brokerConf.subset(Broker.EVENT_LISTENER_CONFIG_PREFIX));
// Initialize the failure detector that removes servers from the broker routing table if they are not healthy
_failureDetector = FailureDetectorFactory.getFailureDetector(_brokerConf, _brokerMetrics);
_failureDetector.registerHealthyServerNotifier(
instanceId -> _routingManager.includeServerToRouting(instanceId));
_failureDetector.registerUnhealthyServerNotifier(
instanceId -> _routingManager.excludeServerFromRouting(instanceId));
_failureDetector.start();
// Create Broker request handler.
String brokerId = _brokerConf.getProperty(Broker.CONFIG_OF_BROKER_ID, getDefaultBrokerId());
String brokerRequestHandlerType =
_brokerConf.getProperty(Broker.BROKER_REQUEST_HANDLER_TYPE, Broker.DEFAULT_BROKER_REQUEST_HANDLER_TYPE);
BaseSingleStageBrokerRequestHandler singleStageBrokerRequestHandler;
if (brokerRequestHandlerType.equalsIgnoreCase(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE)) {
singleStageBrokerRequestHandler = new GrpcBrokerRequestHandler(_brokerConf, brokerId, _routingManager,
_accessControlFactory, _queryQuotaManager, tableCache, _failureDetector);
} else {
// Default request handler type, i.e. netty
NettyConfig nettyDefaults = NettyConfig.extractNettyConfig(_brokerConf, Broker.BROKER_NETTY_PREFIX);
// Configure TLS for netty connection to server
TlsConfig tlsDefaults = null;
if (_brokerConf.getProperty(Broker.BROKER_NETTYTLS_ENABLED, false)) {
tlsDefaults = TlsUtils.extractTlsConfig(_brokerConf, Broker.BROKER_TLS_PREFIX);
}
singleStageBrokerRequestHandler =
new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
_queryQuotaManager, tableCache, nettyDefaults, tlsDefaults, _serverRoutingStatsManager,
_failureDetector);
}
MultiStageBrokerRequestHandler multiStageBrokerRequestHandler = null;
QueryDispatcher queryDispatcher = null;
if (_brokerConf.getProperty(Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, Helix.DEFAULT_MULTI_STAGE_ENGINE_ENABLED)) {
_multiStageQueryThrottler = new MultiStageQueryThrottler();
_multiStageQueryThrottler.init(_spectatorHelixManager);
// multi-stage request handler uses both Netty and GRPC ports.
// worker requires both the "Netty port" for protocol transport; and "GRPC port" for mailbox transport.
// TODO: decouple protocol and engine selection.
queryDispatcher = createQueryDispatcher(_brokerConf);
multiStageBrokerRequestHandler =
new MultiStageBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
_queryQuotaManager, tableCache, _multiStageQueryThrottler, _failureDetector);
}
TimeSeriesRequestHandler timeSeriesRequestHandler = null;
if (StringUtils.isNotBlank(_brokerConf.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey()))) {
Preconditions.checkNotNull(queryDispatcher, "Multistage Engine should be enabled to use time-series engine");
timeSeriesRequestHandler = new TimeSeriesRequestHandler(_brokerConf, brokerId, _routingManager,
_accessControlFactory, _queryQuotaManager, tableCache, queryDispatcher);
}
LOGGER.info("Initializing PinotFSFactory");
PinotFSFactory.init(_brokerConf.subset(CommonConstants.Broker.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY));
LOGGER.info("Initialize ResponseStore");
PinotConfiguration responseStoreConfiguration =
_brokerConf.subset(CommonConstants.CursorConfigs.PREFIX_OF_CONFIG_OF_RESPONSE_STORE);
String expirationTime = _brokerConf.getProperty(CommonConstants.CursorConfigs.RESULTS_EXPIRATION_INTERVAL,
CommonConstants.CursorConfigs.DEFAULT_RESULTS_EXPIRATION_INTERVAL);
_responseStore = (AbstractResponseStore) ResponseStoreService.getInstance().getResponseStore(
responseStoreConfiguration.getProperty(CommonConstants.CursorConfigs.RESPONSE_STORE_TYPE,
CommonConstants.CursorConfigs.DEFAULT_RESPONSE_STORE_TYPE));
_responseStore.init(responseStoreConfiguration.subset(_responseStore.getType()), _hostname, _port, brokerId,
_brokerMetrics, expirationTime);
_brokerRequestHandler =
new BrokerRequestHandlerDelegate(singleStageBrokerRequestHandler, multiStageBrokerRequestHandler,
timeSeriesRequestHandler, _responseStore);
_brokerRequestHandler.start();
// Enable/disable thread CPU time measurement through instance config.
ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(
_brokerConf.getProperty(CommonConstants.Broker.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT,
CommonConstants.Broker.DEFAULT_ENABLE_THREAD_CPU_TIME_MEASUREMENT));
// Enable/disable thread memory allocation tracking through instance config
ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(
_brokerConf.getProperty(CommonConstants.Broker.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
CommonConstants.Broker.DEFAULT_THREAD_ALLOCATED_BYTES_MEASUREMENT));
Tracing.ThreadAccountantOps.initializeThreadAccountant(
_brokerConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId,
org.apache.pinot.spi.config.instance.InstanceType.BROKER);
String controllerUrl = _brokerConf.getProperty(Broker.CONTROLLER_URL);
if (controllerUrl != null) {
_sqlQueryExecutor = new SqlQueryExecutor(controllerUrl);
} else {
_sqlQueryExecutor = new SqlQueryExecutor(_spectatorHelixManager);
}
LOGGER.info("Starting broker admin application on: {}", ListenerConfigUtil.toString(_listenerConfigs));
_brokerAdminApplication = createBrokerAdminApp();
_brokerAdminApplication.start(_listenerConfigs);
if (BrokerGrpcServer.isEnabled(_brokerConf)) {
LOGGER.info("Initializing BrokerGrpcServer");
_brokerGrpcServer = new BrokerGrpcServer(_brokerConf, brokerId, _brokerMetrics, _brokerRequestHandler);
_brokerGrpcServer.start();
} else {
LOGGER.info("BrokerGrpcServer is not enabled");
}
LOGGER.info("Initializing cluster change mediator");
for (ClusterChangeHandler clusterConfigChangeHandler : _clusterConfigChangeHandlers) {
clusterConfigChangeHandler.init(_spectatorHelixManager);
}
_clusterConfigChangeHandlers.add(_queryQuotaManager);
if (_multiStageQueryThrottler != null) {
_clusterConfigChangeHandlers.add(_multiStageQueryThrottler);
}
for (ClusterChangeHandler idealStateChangeHandler : _idealStateChangeHandlers) {
idealStateChangeHandler.init(_spectatorHelixManager);
}
_idealStateChangeHandlers.add(_routingManager);
for (ClusterChangeHandler externalViewChangeHandler : _externalViewChangeHandlers) {
externalViewChangeHandler.init(_spectatorHelixManager);
}
_externalViewChangeHandlers.add(_routingManager);
_externalViewChangeHandlers.add(_queryQuotaManager);
if (_multiStageQueryThrottler != null) {
_externalViewChangeHandlers.add(_multiStageQueryThrottler);
}
for (ClusterChangeHandler instanceConfigChangeHandler : _instanceConfigChangeHandlers) {
instanceConfigChangeHandler.init(_spectatorHelixManager);
}
_instanceConfigChangeHandlers.add(_routingManager);
_instanceConfigChangeHandlers.add(_queryQuotaManager);
for (ClusterChangeHandler liveInstanceChangeHandler : _liveInstanceChangeHandlers) {
liveInstanceChangeHandler.init(_spectatorHelixManager);
}
Map<ChangeType, List<ClusterChangeHandler>> clusterChangeHandlersMap = new HashMap<>();
clusterChangeHandlersMap.put(ChangeType.CLUSTER_CONFIG, _clusterConfigChangeHandlers);
clusterChangeHandlersMap.put(ChangeType.IDEAL_STATE, _idealStateChangeHandlers);
clusterChangeHandlersMap.put(ChangeType.EXTERNAL_VIEW, _externalViewChangeHandlers);
clusterChangeHandlersMap.put(ChangeType.INSTANCE_CONFIG, _instanceConfigChangeHandlers);
if (!_liveInstanceChangeHandlers.isEmpty()) {
clusterChangeHandlersMap.put(ChangeType.LIVE_INSTANCE, _liveInstanceChangeHandlers);
}
_clusterChangeMediator = new ClusterChangeMediator(clusterChangeHandlersMap, _brokerMetrics);
_clusterChangeMediator.start();
_spectatorHelixManager.addIdealStateChangeListener(_clusterChangeMediator);
_spectatorHelixManager.addExternalViewChangeListener(_clusterChangeMediator);
_spectatorHelixManager.addInstanceConfigChangeListener(_clusterChangeMediator);
_spectatorHelixManager.addClusterfigChangeListener(_clusterChangeMediator);
if (!_liveInstanceChangeHandlers.isEmpty()) {
_spectatorHelixManager.addLiveInstanceChangeListener(_clusterChangeMediator);
}
LOGGER.info("Connecting participant Helix manager");
_participantHelixManager =
HelixManagerFactory.getZKHelixManager(_clusterName, _instanceId, InstanceType.PARTICIPANT, _zkServers);
// Register state model factory
_participantHelixManager.getStateMachineEngine()
.registerStateModelFactory(BrokerResourceOnlineOfflineStateModelFactory.getStateModelDef(),
new BrokerResourceOnlineOfflineStateModelFactory(_propertyStore, _helixDataAccessor, _routingManager,
_queryQuotaManager));
// Register user-define message handler factory
_participantHelixManager.getMessagingService()
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
new BrokerUserDefinedMessageHandlerFactory(_routingManager, _queryQuotaManager));
_participantHelixManager.connect();
updateInstanceConfigAndBrokerResourceIfNeeded();
_brokerMetrics.addCallbackGauge(Helix.INSTANCE_CONNECTED_METRIC_NAME,
() -> _participantHelixManager.isConnected() ? 1L : 0L);
_participantHelixManager.addPreConnectCallback(
() -> _brokerMetrics.addMeteredGlobalValue(BrokerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L));
// Initializing Groovy execution security
GroovyFunctionEvaluator.configureGroovySecurity(
_brokerConf.getProperty(CommonConstants.Groovy.GROOVY_QUERY_STATIC_ANALYZER_CONFIG,
_brokerConf.getProperty(CommonConstants.Groovy.GROOVY_ALL_STATIC_ANALYZER_CONFIG)));
// Register the service status handler
registerServiceStatusHandler();
_isStarting = false;
_brokerMetrics.addTimedValue(BrokerTimer.STARTUP_SUCCESS_DURATION_MS,
System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS);
LOGGER.info("Finish starting Pinot broker");
}