in pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java [465:636]
private void setUpPinotController() {
// install default SSL context if necessary (even if not force-enabled everywhere)
TlsConfig tlsDefaults = TlsUtils.extractTlsConfig(_config, ControllerConf.CONTROLLER_TLS_PREFIX);
if (StringUtils.isNotBlank(tlsDefaults.getKeyStorePath()) || StringUtils.isNotBlank(
tlsDefaults.getTrustStorePath())) {
LOGGER.info("Installing default SSL context for any client requests");
TlsUtils.installDefaultSSLSocketFactory(tlsDefaults);
}
// Set up Pinot cluster in Helix if needed
HelixSetupUtils.setupPinotCluster(_helixClusterName, _helixZkURL, _isUpdateStateModel, _enableBatchMessageMode,
_config);
// Start all components
initPinotFSFactory();
initControllerFilePathProvider();
initSegmentFetcherFactory();
initPinotCrypterFactory();
LOGGER.info("Initializing QueryRewriterFactory");
QueryRewriterFactory.init(
_config.getProperty(CommonConstants.Controller.CONFIG_OF_CONTROLLER_QUERY_REWRITER_CLASS_NAMES));
LOGGER.info("Initializing Helix participant manager");
_helixParticipantManager =
HelixManagerFactory.getZKHelixManager(_helixClusterName, _helixParticipantInstanceId, InstanceType.PARTICIPANT,
_helixZkURL);
// LeadControllerManager needs to be initialized before registering as Helix participant.
LOGGER.info("Initializing lead controller manager");
_leadControllerManager =
new LeadControllerManager(_helixControllerInstanceId, _helixParticipantManager, _controllerMetrics);
LOGGER.info("Registering and connecting Helix participant manager as Helix Participant role");
registerAndConnectAsHelixParticipant();
// LeadControllerManager needs to be started after the connection
// as it can check Helix leadership and resource config only after connecting to Helix cluster.
LOGGER.info("Starting lead controller manager");
_leadControllerManager.start();
LOGGER.info("Starting Pinot Helix resource manager and connecting to Zookeeper");
_helixResourceManager.start(_helixParticipantManager, _controllerMetrics);
// Initialize segment lifecycle event listeners
PinotSegmentLifecycleEventListenerManager.getInstance().init(_helixParticipantManager);
LOGGER.info("Starting task resource manager");
_helixTaskResourceManager =
new PinotHelixTaskResourceManager(_helixResourceManager, new TaskDriver(_helixParticipantManager),
_config.getPinotTaskExpireTimeInMs());
// Helix resource manager must be started in order to create PinotLLCRealtimeSegmentManager
LOGGER.info("Starting realtime segment manager");
_pinotLLCRealtimeSegmentManager = createPinotLLCRealtimeSegmentManager();
// TODO: Need to put this inside HelixResourceManager when HelixControllerLeadershipManager is removed.
_helixResourceManager.registerPinotLLCRealtimeSegmentManager(_pinotLLCRealtimeSegmentManager);
SegmentCompletionConfig segmentCompletionConfig = new SegmentCompletionConfig(_config);
_segmentCompletionManager =
new SegmentCompletionManager(_helixParticipantManager, _pinotLLCRealtimeSegmentManager, _controllerMetrics,
_leadControllerManager, _config.getSegmentCommitTimeoutSeconds(), segmentCompletionConfig);
_sqlQueryExecutor = new SqlQueryExecutor(_config.generateVipUrl());
_connectionManager = PoolingHttpClientConnectionManagerHelper.createWithSocketFactory();
_connectionManager.setDefaultSocketConfig(
SocketConfig.custom()
.setSoTimeout(Timeout.of(_config.getServerAdminRequestTimeoutSeconds() * 1000, TimeUnit.MILLISECONDS))
.build());
_tableSizeReader =
new TableSizeReader(_executorService, _connectionManager, _controllerMetrics, _helixResourceManager,
_leadControllerManager);
_helixResourceManager.registerTableSizeReader(_tableSizeReader);
_storageQuotaChecker = new StorageQuotaChecker(_tableSizeReader, _controllerMetrics, _leadControllerManager,
_helixResourceManager, _config);
_diskUtilizationChecker = new DiskUtilizationChecker(_helixResourceManager, _config);
_resourceUtilizationManager = new ResourceUtilizationManager(_config, _diskUtilizationChecker);
// Setting up periodic tasks
List<PeriodicTask> controllerPeriodicTasks = setupControllerPeriodicTasks();
LOGGER.info("Init controller periodic tasks scheduler");
_periodicTaskScheduler = new PeriodicTaskScheduler();
_periodicTaskScheduler.init(controllerPeriodicTasks);
_periodicTaskScheduler.start();
// Register message handler for incoming user-defined helix messages.
_helixParticipantManager.getMessagingService()
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
new ControllerUserDefinedMessageHandlerFactory(_periodicTaskScheduler));
String accessControlFactoryClass = _config.getAccessControlFactoryClass();
LOGGER.info("Use class: {} as the AccessControlFactory", accessControlFactoryClass);
final AccessControlFactory accessControlFactory;
try {
accessControlFactory = (AccessControlFactory) Class.forName(accessControlFactoryClass).newInstance();
accessControlFactory.init(_config, _helixResourceManager);
} catch (Exception e) {
throw new RuntimeException("Caught exception while creating new AccessControlFactory instance", e);
}
final MetadataEventNotifierFactory metadataEventNotifierFactory =
MetadataEventNotifierFactory.loadFactory(_config.subset(METADATA_EVENT_NOTIFIER_PREFIX), _helixResourceManager);
LOGGER.info("Controller download url base: {}", _config.generateVipUrl());
LOGGER.info("Injecting configuration and resource managers to the API context");
// register all the controller objects for injection to jersey resources
Instant controllerStartTime = Instant.now();
_adminApp.registerBinder(new AbstractBinder() {
@Override
protected void configure() {
bind(_config).to(ControllerConf.class);
bind(_helixParticipantInstanceId).named(CONTROLLER_INSTANCE_ID);
bind(_helixResourceManager).to(PinotHelixResourceManager.class);
bind(_helixTaskResourceManager).to(PinotHelixTaskResourceManager.class);
bind(_segmentCompletionManager).to(SegmentCompletionManager.class);
bind(_taskManager).to(PinotTaskManager.class);
bind(_taskManagerStatusCache).to(TaskManagerStatusCache.class);
bind(_connectionManager).to(HttpClientConnectionManager.class);
bind(_executorService).to(Executor.class);
bind(_controllerMetrics).to(ControllerMetrics.class);
bind(accessControlFactory).to(AccessControlFactory.class);
bind(metadataEventNotifierFactory).to(MetadataEventNotifierFactory.class);
bind(_leadControllerManager).to(LeadControllerManager.class);
bind(_periodicTaskScheduler).to(PeriodicTaskScheduler.class);
bind(_sqlQueryExecutor).to(SqlQueryExecutor.class);
bind(_pinotLLCRealtimeSegmentManager).to(PinotLLCRealtimeSegmentManager.class);
bind(_tenantRebalancer).to(TenantRebalancer.class);
bind(_tableSizeReader).to(TableSizeReader.class);
bind(_storageQuotaChecker).to(StorageQuotaChecker.class);
bind(_diskUtilizationChecker).to(DiskUtilizationChecker.class);
bind(_resourceUtilizationManager).to(ResourceUtilizationManager.class);
bind(controllerStartTime).named(ControllerAdminApiApplication.START_TIME);
String loggerRootDir = _config.getProperty(CommonConstants.Controller.CONFIG_OF_LOGGER_ROOT_DIR);
if (loggerRootDir != null) {
bind(new LocalLogFileServer(loggerRootDir)).to(LogFileServer.class);
} else {
bind(new DummyLogFileServer()).to(LogFileServer.class);
}
}
});
LOGGER.info("Starting controller admin application on: {}", ListenerConfigUtil.toString(_listenerConfigs));
_adminApp.start(_listenerConfigs);
enforceTableConfigAndSchema();
_controllerMetrics.addCallbackGauge("dataDir.exists", () -> new File(_config.getDataDir()).exists() ? 1L : 0L);
_controllerMetrics.addCallbackGauge("dataDir.fileOpLatencyMs", () -> {
File dataDir = new File(_config.getDataDir());
if (dataDir.exists()) {
try {
long startTime = System.currentTimeMillis();
File testFile = new File(dataDir, _config.getControllerHost());
try (OutputStream outputStream = new FileOutputStream(testFile, false)) {
outputStream.write(Longs.toByteArray(System.currentTimeMillis()));
}
FileUtils.deleteQuietly(testFile);
return System.currentTimeMillis() - startTime;
} catch (IOException e) {
LOGGER.warn("Caught exception while checking the data directory operation latency", e);
return DATA_DIRECTORY_EXCEPTION_VALUE;
}
} else {
return DATA_DIRECTORY_MISSING_VALUE;
}
});
_serviceStatusCallbackList.add(generateServiceStatusCallback(_helixParticipantManager));
}