public void init()

in src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java [115:222]


    public void init() throws Exception {

        val kylinConfig = KylinConfig.getInstanceFromEnv();
        NCircuitBreaker.start(KapConfig.wrap(kylinConfig));

        boolean isJob = kylinConfig.isJobNode();
        boolean isDataLoading = kylinConfig.isDataLoadingNode();
        boolean isMetadata = kylinConfig.isMetadataNode();
        boolean isQueryOnly = kylinConfig.isQueryNodeOnly();
        boolean isResource = kylinConfig.isResource();

        // set kylin.metadata.distributed-lock.jdbc.url
        // before kylin.metadata.url is changed
        kylinConfig.setJDBCDistributedLockURL(kylinConfig.getJDBCDistributedLockURL().toString());

        if (!isQueryOnly) {
            // restore from metadata, should not delete
            val resourceStore = ResourceStore.getKylinMetaStore(kylinConfig);
            EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
                ResourceStore.getKylinMetaStore(KylinConfig.getInstanceFromEnv()).createMetaStoreUuidIfNotExist();
                return true;
            }, ResourceStore.GLOBAL_PROJECT);
            if (!isResource) {
                resourceStore.catchup();
            }
            InitResourceGroupUtils.initResourceGroup();
            if (isJob || isDataLoading) {
                // register scheduler listener
                EventBusFactory.getInstance().register(new JobSchedulerListener(), false);
                if (kylinConfig.isStreamingConfigEnabled())
                    streamingJobStatsStore = new JdbcStreamingJobStatsStore(kylinConfig);
                // register scheduler listener
                EventBusFactory.getInstance().register(new StreamingJobListener(), true);
            }
            if (isJob || isMetadata) {
                EventBusFactory.getInstance().register(new ModelBrokenListener(), false);
            }

            SparkJobFactoryUtils.initJobFactory();
            ComputedColumnUtil.setEXTRACTOR(ComputedColumnRewriter::extractCcRexNode);
        } else {
            val auditLogStore = new JdbcAuditLogStore(kylinConfig);
            kylinConfig.setQueryHistoryUrl(kylinConfig.getQueryHistoryUrl().toString());
            kylinConfig.setJDBCQueryHistoryURL(kylinConfig.getQueryHistoryUrl().toString());
            kylinConfig.setStreamingStatsUrl(kylinConfig.getStreamingStatsUrl().toString());
            kylinConfig.setJdbcShareStateUrl(kylinConfig.getJdbcShareStateUrl().toString());
            if (kylinConfig.getMetadataStoreType().equals("hdfs")) {
                // cache db metadata url before switch to hdfs
                kylinConfig.setCoreMetadataDBUrl();
                kylinConfig.setProperty("kylin.metadata.url", kylinConfig.getMetadataUrlPrefix() + "@hdfs");
            }
            val resourceStore = ResourceStore.getKylinMetaStore(kylinConfig);
            resourceStore.getMetadataStore().setAuditLogStore(auditLogStore);
            resourceStore.catchup();
        }

        kylinConfig.setQueryHistoryUrl(kylinConfig.getQueryHistoryUrl().toString());
        kylinConfig.getDistributedLockFactory().initialize();
        if (!isResource) {
            warmUpSystemCache();
            cacheCcRexNode();
        }
        context.publishEvent(new AfterMetadataReadyEvent(context));

        if (kylinConfig.isQueryNode()) {
            if (kylinConfig.isSparderAsync()) {
                context.publishEvent(new SparderStartEvent.AsyncEvent(context));
            } else {
                context.publishEvent(new SparderStartEvent.SyncEvent(context));
            }
            // register acl update listener
            EventListenerRegistry.getInstance(kylinConfig).register(new AclTCRListener(queryCacheManager), "acl");
            // register schema change listener, for clean query cache
            EventListenerRegistry.getInstance(kylinConfig).register(new TableSchemaChangeListener(queryCacheManager),
                    "table");
            EventBusFactory.getInstance().register(new QueryMetricsListener(), false);
            if (kylinConfig.isBloomCollectFilterEnabled()) {
                QueryFiltersCollector.initScheduler();
            }
        }
        EventBusFactory.getInstance().register(ProcessStatusListener.getInstance(), true);
        // register for clean cache when delete
        EventListenerRegistry.getInstance(kylinConfig).register(new CacheCleanListener(), "cacheInManager");

        EventBusFactory.getInstance().register(new UserAclListener(), true);

        if (kylinConfig.isAllowNonAsciiCharInUrl()) {
            // Note: DefaultHttpFirewall vs StrictHttpFirewall
            // In order to allow Chinese chars on URL like "/{cubeName}/segments",
            // we have to use DefaultHttpFirewall.
            // If later we have to use StrictHttpFirewall,
            // then StrictHttpFirewall.rejectNonPrintableAsciiCharactersInFieldName()
            // must be overridden to allow Chinese chars on URL.
            FilterChainProxy filterChainProxy = context.getBean(FilterChainProxy.class);
            filterChainProxy.setFirewall(this.getHttpFirewall());
        }

        postInit();

        log.info("Kylin initialization completed.");
        log.info("KylinConfig in env, ID is {}", kylinConfig.hashCode());
        log.info("KylinConfig in env, metadata is {}", kylinConfig.getMetadataUrl());
        log.info("KylinConfig in env, working dir is {}", kylinConfig.getHdfsWorkingDirectory());

        // Init global static instances
        CleanTaskExecutorService.getInstance().bindWorkingPool(() -> PriorityExecutor
                .newWorkingThreadPool("clean-storages-pool", kylinConfig.getStorageCleanTaskConcurrency()));
    }