in src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java [115:222]
public void init() throws Exception {
val kylinConfig = KylinConfig.getInstanceFromEnv();
NCircuitBreaker.start(KapConfig.wrap(kylinConfig));
boolean isJob = kylinConfig.isJobNode();
boolean isDataLoading = kylinConfig.isDataLoadingNode();
boolean isMetadata = kylinConfig.isMetadataNode();
boolean isQueryOnly = kylinConfig.isQueryNodeOnly();
boolean isResource = kylinConfig.isResource();
// set kylin.metadata.distributed-lock.jdbc.url
// before kylin.metadata.url is changed
kylinConfig.setJDBCDistributedLockURL(kylinConfig.getJDBCDistributedLockURL().toString());
if (!isQueryOnly) {
// restore from metadata, should not delete
val resourceStore = ResourceStore.getKylinMetaStore(kylinConfig);
EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
ResourceStore.getKylinMetaStore(KylinConfig.getInstanceFromEnv()).createMetaStoreUuidIfNotExist();
return true;
}, ResourceStore.GLOBAL_PROJECT);
if (!isResource) {
resourceStore.catchup();
}
InitResourceGroupUtils.initResourceGroup();
if (isJob || isDataLoading) {
// register scheduler listener
EventBusFactory.getInstance().register(new JobSchedulerListener(), false);
if (kylinConfig.isStreamingConfigEnabled())
streamingJobStatsStore = new JdbcStreamingJobStatsStore(kylinConfig);
// register scheduler listener
EventBusFactory.getInstance().register(new StreamingJobListener(), true);
}
if (isJob || isMetadata) {
EventBusFactory.getInstance().register(new ModelBrokenListener(), false);
}
SparkJobFactoryUtils.initJobFactory();
ComputedColumnUtil.setEXTRACTOR(ComputedColumnRewriter::extractCcRexNode);
} else {
val auditLogStore = new JdbcAuditLogStore(kylinConfig);
kylinConfig.setQueryHistoryUrl(kylinConfig.getQueryHistoryUrl().toString());
kylinConfig.setJDBCQueryHistoryURL(kylinConfig.getQueryHistoryUrl().toString());
kylinConfig.setStreamingStatsUrl(kylinConfig.getStreamingStatsUrl().toString());
kylinConfig.setJdbcShareStateUrl(kylinConfig.getJdbcShareStateUrl().toString());
if (kylinConfig.getMetadataStoreType().equals("hdfs")) {
// cache db metadata url before switch to hdfs
kylinConfig.setCoreMetadataDBUrl();
kylinConfig.setProperty("kylin.metadata.url", kylinConfig.getMetadataUrlPrefix() + "@hdfs");
}
val resourceStore = ResourceStore.getKylinMetaStore(kylinConfig);
resourceStore.getMetadataStore().setAuditLogStore(auditLogStore);
resourceStore.catchup();
}
kylinConfig.setQueryHistoryUrl(kylinConfig.getQueryHistoryUrl().toString());
kylinConfig.getDistributedLockFactory().initialize();
if (!isResource) {
warmUpSystemCache();
cacheCcRexNode();
}
context.publishEvent(new AfterMetadataReadyEvent(context));
if (kylinConfig.isQueryNode()) {
if (kylinConfig.isSparderAsync()) {
context.publishEvent(new SparderStartEvent.AsyncEvent(context));
} else {
context.publishEvent(new SparderStartEvent.SyncEvent(context));
}
// register acl update listener
EventListenerRegistry.getInstance(kylinConfig).register(new AclTCRListener(queryCacheManager), "acl");
// register schema change listener, for clean query cache
EventListenerRegistry.getInstance(kylinConfig).register(new TableSchemaChangeListener(queryCacheManager),
"table");
EventBusFactory.getInstance().register(new QueryMetricsListener(), false);
if (kylinConfig.isBloomCollectFilterEnabled()) {
QueryFiltersCollector.initScheduler();
}
}
EventBusFactory.getInstance().register(ProcessStatusListener.getInstance(), true);
// register for clean cache when delete
EventListenerRegistry.getInstance(kylinConfig).register(new CacheCleanListener(), "cacheInManager");
EventBusFactory.getInstance().register(new UserAclListener(), true);
if (kylinConfig.isAllowNonAsciiCharInUrl()) {
// Note: DefaultHttpFirewall vs StrictHttpFirewall
// In order to allow Chinese chars on URL like "/{cubeName}/segments",
// we have to use DefaultHttpFirewall.
// If later we have to use StrictHttpFirewall,
// then StrictHttpFirewall.rejectNonPrintableAsciiCharactersInFieldName()
// must be overridden to allow Chinese chars on URL.
FilterChainProxy filterChainProxy = context.getBean(FilterChainProxy.class);
filterChainProxy.setFirewall(this.getHttpFirewall());
}
postInit();
log.info("Kylin initialization completed.");
log.info("KylinConfig in env, ID is {}", kylinConfig.hashCode());
log.info("KylinConfig in env, metadata is {}", kylinConfig.getMetadataUrl());
log.info("KylinConfig in env, working dir is {}", kylinConfig.getHdfsWorkingDirectory());
// Init global static instances
CleanTaskExecutorService.getInstance().bindWorkingPool(() -> PriorityExecutor
.newWorkingThreadPool("clean-storages-pool", kylinConfig.getStorageCleanTaskConcurrency()));
}