public void start()

in pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java [283:517]


  public void start()
      throws Exception {
    LOGGER.info("Starting Pinot broker (Version: {})", PinotVersion.VERSION);
    LOGGER.info("Broker configs: {}", new PinotAppConfigs(getConfig()).toJSONString());
    long startTimeMs = System.currentTimeMillis();
    _isStarting = true;
    Utils.logVersions();

    LOGGER.info("Connecting spectator Helix manager");
    _spectatorHelixManager =
        HelixManagerFactory.getZKHelixManager(_clusterName, _instanceId, InstanceType.SPECTATOR, _zkServers);
    _spectatorHelixManager.connect();
    _helixAdmin = _spectatorHelixManager.getClusterManagmentTool();
    _propertyStore = _spectatorHelixManager.getHelixPropertyStore();
    _helixDataAccessor = _spectatorHelixManager.getHelixDataAccessor();

    LOGGER.info("Setting up broker request handler");
    // Set up metric registry and broker metrics
    _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(_brokerConf.subset(Broker.METRICS_CONFIG_PREFIX));
    _brokerMetrics = new BrokerMetrics(
        _brokerConf.getProperty(Broker.CONFIG_OF_METRICS_NAME_PREFIX, Broker.DEFAULT_METRICS_NAME_PREFIX),
        _metricsRegistry,
        _brokerConf.getProperty(Broker.CONFIG_OF_ENABLE_TABLE_LEVEL_METRICS, Broker.DEFAULT_ENABLE_TABLE_LEVEL_METRICS),
        _brokerConf.getProperty(Broker.CONFIG_OF_ALLOWED_TABLES_FOR_EMITTING_METRICS, Collections.emptyList()));
    _brokerMetrics.initializeGlobalMeters();
    _brokerMetrics.setValueOfGlobalGauge(BrokerGauge.VERSION, PinotVersion.VERSION_METRIC_NAME, 1);
    _brokerMetrics.setValueOfGlobalGauge(BrokerGauge.ZK_JUTE_MAX_BUFFER,
        Integer.getInteger(ZkSystemPropertyKeys.JUTE_MAXBUFFER, 0xfffff));
    _brokerMetrics.setValueOfGlobalGauge(BrokerGauge.ADAPTIVE_SERVER_SELECTOR_TYPE,
        _brokerConf.getProperty(Broker.AdaptiveServerSelector.CONFIG_OF_TYPE,
            Broker.AdaptiveServerSelector.DEFAULT_TYPE), 1);
    BrokerMetrics.register(_brokerMetrics);
    // Set up request handling classes
    _serverRoutingStatsManager = new ServerRoutingStatsManager(_brokerConf, _brokerMetrics);
    _serverRoutingStatsManager.init();
    _routingManager = new BrokerRoutingManager(_brokerMetrics, _serverRoutingStatsManager, _brokerConf);
    _routingManager.init(_spectatorHelixManager);
    final PinotConfiguration factoryConf = _brokerConf.subset(Broker.ACCESS_CONTROL_CONFIG_PREFIX);
    // Adding cluster name to the config so that it can be used by the AccessControlFactory
    factoryConf.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, _brokerConf.getProperty(Helix.CONFIG_OF_CLUSTER_NAME));
    _accessControlFactory = AccessControlFactory.loadFactory(factoryConf, _propertyStore);
    _queryQuotaManager = new HelixExternalViewBasedQueryQuotaManager(_brokerMetrics, _instanceId);
    _queryQuotaManager.init(_spectatorHelixManager);
    // Initialize QueryRewriterFactory
    LOGGER.info("Initializing QueryRewriterFactory");
    QueryRewriterFactory.init(_brokerConf.getProperty(Broker.CONFIG_OF_BROKER_QUERY_REWRITER_CLASS_NAMES));
    LOGGER.info("Initializing ResultRewriterFactory");
    ResultRewriterFactory.init(_brokerConf.getProperty(Broker.CONFIG_OF_BROKER_RESULT_REWRITER_CLASS_NAMES));
    // Initialize FunctionRegistry before starting the broker request handler
    FunctionRegistry.init();
    boolean caseInsensitive =
        _brokerConf.getProperty(Helix.ENABLE_CASE_INSENSITIVE_KEY, Helix.DEFAULT_ENABLE_CASE_INSENSITIVE);
    TableCache tableCache = new TableCache(_propertyStore, caseInsensitive);

    LOGGER.info("Initializing Broker Event Listener Factory");
    BrokerQueryEventListenerFactory.init(_brokerConf.subset(Broker.EVENT_LISTENER_CONFIG_PREFIX));

    // Initialize the failure detector that removes servers from the broker routing table if they are not healthy
    _failureDetector = FailureDetectorFactory.getFailureDetector(_brokerConf, _brokerMetrics);
    _failureDetector.registerHealthyServerNotifier(
        instanceId -> _routingManager.includeServerToRouting(instanceId));
    _failureDetector.registerUnhealthyServerNotifier(
        instanceId -> _routingManager.excludeServerFromRouting(instanceId));
    _failureDetector.start();

    // Create Broker request handler.
    String brokerId = _brokerConf.getProperty(Broker.CONFIG_OF_BROKER_ID, getDefaultBrokerId());
    String brokerRequestHandlerType =
        _brokerConf.getProperty(Broker.BROKER_REQUEST_HANDLER_TYPE, Broker.DEFAULT_BROKER_REQUEST_HANDLER_TYPE);
    BaseSingleStageBrokerRequestHandler singleStageBrokerRequestHandler;
    if (brokerRequestHandlerType.equalsIgnoreCase(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE)) {
      singleStageBrokerRequestHandler = new GrpcBrokerRequestHandler(_brokerConf, brokerId, _routingManager,
          _accessControlFactory, _queryQuotaManager, tableCache, _failureDetector);
    } else {
      // Default request handler type, i.e. netty
      NettyConfig nettyDefaults = NettyConfig.extractNettyConfig(_brokerConf, Broker.BROKER_NETTY_PREFIX);
      // Configure TLS for netty connection to server
      TlsConfig tlsDefaults = null;
      if (_brokerConf.getProperty(Broker.BROKER_NETTYTLS_ENABLED, false)) {
        tlsDefaults = TlsUtils.extractTlsConfig(_brokerConf, Broker.BROKER_TLS_PREFIX);
      }
      singleStageBrokerRequestHandler =
          new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
              _queryQuotaManager, tableCache, nettyDefaults, tlsDefaults, _serverRoutingStatsManager,
              _failureDetector);
    }
    MultiStageBrokerRequestHandler multiStageBrokerRequestHandler = null;
    QueryDispatcher queryDispatcher = null;
    if (_brokerConf.getProperty(Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, Helix.DEFAULT_MULTI_STAGE_ENGINE_ENABLED)) {
      _multiStageQueryThrottler = new MultiStageQueryThrottler();
      _multiStageQueryThrottler.init(_spectatorHelixManager);
      // multi-stage request handler uses both Netty and GRPC ports.
      // worker requires both the "Netty port" for protocol transport; and "GRPC port" for mailbox transport.
      // TODO: decouple protocol and engine selection.
      queryDispatcher = createQueryDispatcher(_brokerConf);
      multiStageBrokerRequestHandler =
          new MultiStageBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
              _queryQuotaManager, tableCache, _multiStageQueryThrottler, _failureDetector);
    }
    TimeSeriesRequestHandler timeSeriesRequestHandler = null;
    if (StringUtils.isNotBlank(_brokerConf.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey()))) {
      Preconditions.checkNotNull(queryDispatcher, "Multistage Engine should be enabled to use time-series engine");
      timeSeriesRequestHandler = new TimeSeriesRequestHandler(_brokerConf, brokerId, _routingManager,
          _accessControlFactory, _queryQuotaManager, tableCache, queryDispatcher);
    }

    LOGGER.info("Initializing PinotFSFactory");
    PinotFSFactory.init(_brokerConf.subset(CommonConstants.Broker.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY));

    LOGGER.info("Initialize ResponseStore");
    PinotConfiguration responseStoreConfiguration =
        _brokerConf.subset(CommonConstants.CursorConfigs.PREFIX_OF_CONFIG_OF_RESPONSE_STORE);

    String expirationTime = _brokerConf.getProperty(CommonConstants.CursorConfigs.RESULTS_EXPIRATION_INTERVAL,
        CommonConstants.CursorConfigs.DEFAULT_RESULTS_EXPIRATION_INTERVAL);

    _responseStore = (AbstractResponseStore) ResponseStoreService.getInstance().getResponseStore(
        responseStoreConfiguration.getProperty(CommonConstants.CursorConfigs.RESPONSE_STORE_TYPE,
            CommonConstants.CursorConfigs.DEFAULT_RESPONSE_STORE_TYPE));
    _responseStore.init(responseStoreConfiguration.subset(_responseStore.getType()), _hostname, _port, brokerId,
        _brokerMetrics, expirationTime);

    _brokerRequestHandler =
        new BrokerRequestHandlerDelegate(singleStageBrokerRequestHandler, multiStageBrokerRequestHandler,
            timeSeriesRequestHandler, _responseStore);
    _brokerRequestHandler.start();

    // Enable/disable thread CPU time measurement through instance config.
    ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(
        _brokerConf.getProperty(CommonConstants.Broker.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT,
            CommonConstants.Broker.DEFAULT_ENABLE_THREAD_CPU_TIME_MEASUREMENT));
    // Enable/disable thread memory allocation tracking through instance config
    ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(
        _brokerConf.getProperty(CommonConstants.Broker.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
            CommonConstants.Broker.DEFAULT_THREAD_ALLOCATED_BYTES_MEASUREMENT));
    Tracing.ThreadAccountantOps.initializeThreadAccountant(
        _brokerConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId,
        org.apache.pinot.spi.config.instance.InstanceType.BROKER);

    String controllerUrl = _brokerConf.getProperty(Broker.CONTROLLER_URL);
    if (controllerUrl != null) {
      _sqlQueryExecutor = new SqlQueryExecutor(controllerUrl);
    } else {
      _sqlQueryExecutor = new SqlQueryExecutor(_spectatorHelixManager);
    }
    LOGGER.info("Starting broker admin application on: {}", ListenerConfigUtil.toString(_listenerConfigs));
    _brokerAdminApplication = createBrokerAdminApp();
    _brokerAdminApplication.start(_listenerConfigs);

    if (BrokerGrpcServer.isEnabled(_brokerConf)) {
      LOGGER.info("Initializing BrokerGrpcServer");
      _brokerGrpcServer = new BrokerGrpcServer(_brokerConf, brokerId, _brokerMetrics, _brokerRequestHandler);
      _brokerGrpcServer.start();
    } else {
      LOGGER.info("BrokerGrpcServer is not enabled");
    }

    LOGGER.info("Initializing cluster change mediator");
    for (ClusterChangeHandler clusterConfigChangeHandler : _clusterConfigChangeHandlers) {
      clusterConfigChangeHandler.init(_spectatorHelixManager);
    }
    _clusterConfigChangeHandlers.add(_queryQuotaManager);
    if (_multiStageQueryThrottler != null) {
      _clusterConfigChangeHandlers.add(_multiStageQueryThrottler);
    }
    for (ClusterChangeHandler idealStateChangeHandler : _idealStateChangeHandlers) {
      idealStateChangeHandler.init(_spectatorHelixManager);
    }
    _idealStateChangeHandlers.add(_routingManager);
    for (ClusterChangeHandler externalViewChangeHandler : _externalViewChangeHandlers) {
      externalViewChangeHandler.init(_spectatorHelixManager);
    }
    _externalViewChangeHandlers.add(_routingManager);
    _externalViewChangeHandlers.add(_queryQuotaManager);
    if (_multiStageQueryThrottler != null) {
      _externalViewChangeHandlers.add(_multiStageQueryThrottler);
    }
    for (ClusterChangeHandler instanceConfigChangeHandler : _instanceConfigChangeHandlers) {
      instanceConfigChangeHandler.init(_spectatorHelixManager);
    }
    _instanceConfigChangeHandlers.add(_routingManager);
    _instanceConfigChangeHandlers.add(_queryQuotaManager);
    for (ClusterChangeHandler liveInstanceChangeHandler : _liveInstanceChangeHandlers) {
      liveInstanceChangeHandler.init(_spectatorHelixManager);
    }
    Map<ChangeType, List<ClusterChangeHandler>> clusterChangeHandlersMap = new HashMap<>();
    clusterChangeHandlersMap.put(ChangeType.CLUSTER_CONFIG, _clusterConfigChangeHandlers);
    clusterChangeHandlersMap.put(ChangeType.IDEAL_STATE, _idealStateChangeHandlers);
    clusterChangeHandlersMap.put(ChangeType.EXTERNAL_VIEW, _externalViewChangeHandlers);
    clusterChangeHandlersMap.put(ChangeType.INSTANCE_CONFIG, _instanceConfigChangeHandlers);
    if (!_liveInstanceChangeHandlers.isEmpty()) {
      clusterChangeHandlersMap.put(ChangeType.LIVE_INSTANCE, _liveInstanceChangeHandlers);
    }
    _clusterChangeMediator = new ClusterChangeMediator(clusterChangeHandlersMap, _brokerMetrics);
    _clusterChangeMediator.start();
    _spectatorHelixManager.addIdealStateChangeListener(_clusterChangeMediator);
    _spectatorHelixManager.addExternalViewChangeListener(_clusterChangeMediator);
    _spectatorHelixManager.addInstanceConfigChangeListener(_clusterChangeMediator);
    _spectatorHelixManager.addClusterfigChangeListener(_clusterChangeMediator);
    if (!_liveInstanceChangeHandlers.isEmpty()) {
      _spectatorHelixManager.addLiveInstanceChangeListener(_clusterChangeMediator);
    }

    LOGGER.info("Connecting participant Helix manager");
    _participantHelixManager =
        HelixManagerFactory.getZKHelixManager(_clusterName, _instanceId, InstanceType.PARTICIPANT, _zkServers);
    // Register state model factory
    _participantHelixManager.getStateMachineEngine()
        .registerStateModelFactory(BrokerResourceOnlineOfflineStateModelFactory.getStateModelDef(),
            new BrokerResourceOnlineOfflineStateModelFactory(_propertyStore, _helixDataAccessor, _routingManager,
                _queryQuotaManager));
    // Register user-define message handler factory
    _participantHelixManager.getMessagingService()
        .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
            new BrokerUserDefinedMessageHandlerFactory(_routingManager, _queryQuotaManager));
    _participantHelixManager.connect();
    updateInstanceConfigAndBrokerResourceIfNeeded();
    _brokerMetrics.addCallbackGauge(Helix.INSTANCE_CONNECTED_METRIC_NAME,
        () -> _participantHelixManager.isConnected() ? 1L : 0L);
    _participantHelixManager.addPreConnectCallback(
        () -> _brokerMetrics.addMeteredGlobalValue(BrokerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L));

    // Initializing Groovy execution security
    GroovyFunctionEvaluator.configureGroovySecurity(
        _brokerConf.getProperty(CommonConstants.Groovy.GROOVY_QUERY_STATIC_ANALYZER_CONFIG,
            _brokerConf.getProperty(CommonConstants.Groovy.GROOVY_ALL_STATIC_ANALYZER_CONFIG)));

    // Register the service status handler
    registerServiceStatusHandler();

    _isStarting = false;
    _brokerMetrics.addTimedValue(BrokerTimer.STARTUP_SUCCESS_DURATION_MS,
        System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS);
    LOGGER.info("Finish starting Pinot broker");
  }